-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: replicatedEventOriginFilter should not filter replayed events #1129
Conversation
producerSource.replicatedEventOriginFilter | ||
.flatMap(f => init.replicaInfo.map(f.createFilter)) | ||
.getOrElse((_: EventEnvelope[_]) => true) | ||
|
||
val eventsStreamOut: Flow[StreamIn, StreamOut, NotUsed] = | ||
eventsFlow.mapAsync(producerSource.settings.transformationParallelism) { env => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Below is mostly formatting changes. It looks for FilteredPayload
emitted by the FilterStage and transforms that to the proto FilteredEvent. Maybe ugly to use FilteredPayload for this, but wouldn't be better to mess with the source
field.
if (producerFilter(env) && filter.matches(env)) { | ||
log.traceN("Stream [{}]: Push event persistenceId [{}], seqNr [{}]", logPrefix, pid, env.sequenceNr) | ||
push(outEnv, env) | ||
if (replicatedEventOriginFilter(env)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replayed events are not emitted via this InHandler
Note that the RES Behavior will also filter duplicates itself, so I think the EventOriginFilter was mostly added as an optimization. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems reasonable
* EventOriginFilter is only used for ReplicatedEventSourcing * It was applied after the FilterStage and therefore also filtered events from replay requests. That works if the replica can instead receive the event from the origin replica, but if combined with dynamic consumer filter the original replica may not emit anything due to the consumer filter on that replica. * Solution is to use the EventOriginFilter inside the FilterStage instead, so that is not used for replay requests.
17f1392
to
663c9de
Compare
* separate table per replica is needed because otherwise the eventsBySlice query for a given entity type will be able to pick up events written by another replica, which would be wrong
} | ||
akka.http.server.preview.enable-http2 = on | ||
akka.persistence.r2dbc { | ||
journal.table = "event_journal_${dc.id}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test was revealing the issue of sharing the same table for different replicas: otherwise the eventsBySlice query for a given entity type will be able to pick up events written by another replica, which would be wrong
I changed all tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, sorry about that shortcut causing extra work now.
// replicate to C | ||
entityRefA.ask(TestEntity.SetScope(Set(DCC.id), _)).futureValue | ||
eventually { | ||
entityRefC.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCC.id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here is the point it would fail without the fix
I'll have to test this more, and think about if it is the right solution. It works for the reported problematic scenario.