Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CurrentEventsByTag query does not guarantee ordering of events #214

Open
pepite opened this issue Dec 23, 2018 · 9 comments
Open

CurrentEventsByTag query does not guarantee ordering of events #214

pepite opened this issue Dec 23, 2018 · 9 comments

Comments

@pepite
Copy link

pepite commented Dec 23, 2018

The current CurrentEventsByTag object query the events sorted by Id. However, there is no guarantee an id is greater than the other when inserted in the journal.

An object ID is encoded with the following specification:

a 4-byte value representing the seconds since the Unix epoch,
a 5-byte random value, and
a 3-byte counter, starting with a random value.

This means that if two events is inserted at the same second we have no guarantee one is inserted before the other.

The relevant code is:

object CurrentEventsByTag {
def source(driver: RxMongoDriver, tag: String, fromOffset: Offset)(implicit m: Materializer): Source[(Event, Offset), NotUsed] = {
import driver.RxMongoSerializers._
implicit val ec: ExecutionContext = driver.querySideDispatcher
val offset = fromOffset match {
case NoOffset => None
case ObjectIdOffset(hexStr, _) => BSONObjectID.parse(hexStr).toOption
    }
val query = BSONDocument(
TAGS -> tag
    ).merge(offset.fold(BSONDocument.empty)(id => BSONDocument(ID -> BSONDocument("$gt" -> id))))
 Source.fromFuture(driver.journalCollectionsAsFuture)
          .flatMapConcat{ xs =>
            xs.map(c =>
              c.find(query)
               .sort(BSONDocument(ID -> 1))
               .cursor[BSONDocument]()
               .documentSource()
            ).reduceLeftOption(_ ++ _)
             .getOrElse(Source.empty)
          }.map{ doc =>
            val id = doc.getAs[BSONObjectID](ID).get
            doc.getAs[BSONArray](EVENTS)
              .map(_.elements
                .map(_.value)
                .collect{ case d:BSONDocument => driver.deserializeJournal(d) -> ObjectIdOffset(id.stringify, id.time) }
                .filter(_._1.tags.contains(tag))
              )
              .getOrElse(Nil)
    }.mapConcat(identity)

with BSONDocument(ID -> BSONDocument("$gt" -> id)) and .sort(BSONDocument(ID -> 1)) being the issue I suppose. I believe it should be .sort(BSONDocument(SEQUENCE_NUMBER -> 1))

@pepite pepite changed the title CurrentEventsByTag wrong query CurrentEventsByTag query does not guarantee ordering of events Dec 23, 2018
@scullxbones
Copy link
Owner

Hi @pepite -

There's a bunch of discussion on this in #37. Implementing a global sequence number (#95 maybe) should solve the issue.

I believe it should be .sort(BSONDocument(SEQUENCE_NUMBER -> 1))

The problem with SEQUENCE_NUMBER is that this counter is unique per persistenceId. Tags are typically used to cross persistenceIds, so I believe a sort on just this field would end up worse than _id, as it would cause sort problems outside of 1s intervals.

@pepite
Copy link
Author

pepite commented Dec 26, 2018

thanks I will have a look at what I can do and submit a PR.

@pepite
Copy link
Author

pepite commented Dec 27, 2018

Would a solution then be to group by persistence id and then sort by sequence number? Or do I miss something?

@scullxbones
Copy link
Owner

I think you really end up needing a sequence number either at the tag level or at the global level to accomplish ordering.

Maybe for some use cases of the plugin the suggested approach of group by persistence id then sort by sequence number could work well enough, but in general for a concurrent set of persistentIds that share a tag it will not reflect chronological order.

The case of batched inserts could be fixed by sorting on _id then persistenceId then sequenceNumber, but that doesn't seem to cover the general case IMO.

@pepite
Copy link
Author

pepite commented Jan 2, 2019

First version is there https://github.com/pepite/akka-persistence-mongo/tree/fix-ordering-event . PR to follow (I still need a migration feature, some extra tests and maybe some explanation). However the tests are not all passing on my Mac (this branch and master). I started test_container but some of them fails, especially the Akka test kit. Is there anything I should be aware of?

@scullxbones
Copy link
Owner

I will take a deeper look soon - but in quickly looking at this, I'm concerned that it may not be backward compatible. It could be that I just haven't looked at it deep enough yet. Just need to tread carefully since this code has been in production for a while (write side for several years, read side less).

The tests have a couple of race conditions in them, so do not always pass. I use a minimum of akka.test.timefactor=3. With this, the test suite passes on travis for most combinations of (scala + mongo) on the first attempt.

@pepite
Copy link
Author

pepite commented Jan 3, 2019 via email

@yahor-filipchyk
Copy link

We are experiencing this issue occasionally. The main problem it seems is that the _id is generated on the client side. And the realtime collection is affected by this the most. Since writes are happening concurrently, the order of events, when ordered by _id, doesn't always match the insertion order. And for the realtime collection the insertion order is what matters (because it is a capped collection). If the _id was generated on the server, the sequential ordering of offsets would be guarantied because all writes go through the primary node (not sure how this works for sharded collections). But then you'd have to somehow sequence the insertion of documents into the realtime collection. Which is probably impossible (or impractical) in a multi node environment.

I have one idea that should work for Mongo version >= 3.6. It will change the read side setup though, but shouldn't require migration since the journal collection is the source of truth anyway.

  1. Don't generate _id on the client side. Unfortunately that's the default behavior for inserts in the official mongo driver. But there are a few workarounds I think (can override codec for BsonDocument or use findOneAndUpdate with upsert=true instead of insert).
  2. Use db.collection.watch on the journal collection as the source for the read side. It is producing a resumable cursor (but just failing the stream if something goes wrong would be fine to start). Watch only works with replica sets though so need to do something else for single node deployments (and local testing).

@scullxbones what do you think?

I'm not sure about the performance implications of this approach but it sounds like a robust solution that should guarantee consistency between the write and the read sides

@scullxbones
Copy link
Owner

@scullxbones what do you think?

Yep no doubt the 3.6 watch functionality is cool. 3.4 is EOL January 2020, can revisit then. Before that I don't want to stop supporting prior versions.

I'd be interested though in the workarounds you mentioned in (1). Especially for #238 - it would be good to make the 3 drivers as consistent as possible. Or at least the 2 that are actively developed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants