Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Migration to replicated event sourcing #1171

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
/*
* Copyright (C) 2009-2023 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.projection.grpc.replication

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps
import akka.cluster.MemberStatus
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
import akka.grpc.GrpcClientSettings
import akka.http.scaladsl.Http
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.ReplicaId
import akka.projection.grpc.TestContainerConf
import akka.projection.grpc.TestDbLifecycle
import akka.projection.grpc.producer.EventProducerSettings
import akka.projection.grpc.replication.scaladsl.Replica
import akka.projection.grpc.replication.scaladsl.Replication
import akka.projection.grpc.replication.scaladsl.Replication.EdgeReplication
import akka.projection.grpc.replication.scaladsl.ReplicationSettings
import akka.projection.r2dbc.scaladsl.R2dbcReplication
import akka.testkit.SocketUtil
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.LoggerFactory

object PushReplicationMigrationIntegrationSpec {

private def config(dc: ReplicaId): Config = {
val journalTable = if (dc == ReplicaId.empty) "event_journal" else s"event_journal_${dc.id}"
val timestampOffsetTable =
if (dc == ReplicaId.empty) "akka_projection_timestamp_offset_store"
else s"akka_projection_timestamp_offset_store_${dc.id}"
ConfigFactory.parseString(s"""
akka.actor.provider = cluster
akka.http.server.preview.enable-http2 = on
akka.persistence.r2dbc {
journal.table = "$journalTable"
query {
refresh-interval = 500 millis
# reducing this to have quicker test, triggers backtracking earlier
backtracking.behind-current-time = 3 seconds
}
}
akka.projection.grpc {
producer {
query-plugin-id = "akka.persistence.r2dbc.query"
}
}
akka.projection.r2dbc.offset-store {
timestamp-offset-table = "$timestampOffsetTable"
}
akka.remote.artery.canonical.host = "127.0.0.1"
akka.remote.artery.canonical.port = 0
akka.actor.testkit.typed {
filter-leeway = 10s
system-shutdown-default = 30s
}
""")
}

private val DCA = ReplicaId("DCA")
private val DCB = ReplicaId("DCB")
private val EdgeReplicaA = ReplicaId.empty

}

class PushReplicationMigrationIntegrationSpec(testContainerConf: TestContainerConf)
extends ScalaTestWithActorTestKit(
akka.actor
.ActorSystem(
"ReplicationMigrationIntegrationSpecA",
PushReplicationMigrationIntegrationSpec
.config(PushReplicationMigrationIntegrationSpec.DCA)
.withFallback(testContainerConf.config))
.toTyped)
with AnyWordSpecLike
with TestDbLifecycle
with BeforeAndAfterAll
with LogCapturing {
import PushReplicationMigrationIntegrationSpec._
import ReplicationMigrationIntegrationSpec.HelloWorld
implicit val ec: ExecutionContext = system.executionContext

def this() = this(new TestContainerConf)

private val logger = LoggerFactory.getLogger(classOf[PushReplicationMigrationIntegrationSpec])
override def typedSystem: ActorSystem[_] = testKit.system

private val systems = Seq[ActorSystem[_]](
typedSystem,
akka.actor
.ActorSystem(
"ReplicationMigrationIntegrationSpecB",
PushReplicationMigrationIntegrationSpec
.config(PushReplicationMigrationIntegrationSpec.DCB)
.withFallback(testContainerConf.config))
.toTyped,
akka.actor
.ActorSystem(
"ReplicationMigrationIntegrationSpecEdgeA",
PushReplicationMigrationIntegrationSpec
.config(PushReplicationMigrationIntegrationSpec.EdgeReplicaA)
.withFallback(testContainerConf.config))
.toTyped)

private val grpcPorts = SocketUtil.temporaryServerAddresses(2, "127.0.0.1").map(_.getPort)
def grpcClientSettings(index: Int) =
GrpcClientSettings.connectToServiceAt("127.0.0.1", grpcPorts(index)).withTls(false)
private val replicaA = Replica(DCA, 2, grpcClientSettings(0))
private val replicaB = Replica(DCB, 2, grpcClientSettings(1))
private val allCloudReplicas = Set(replicaA, replicaB)

/*
private val _ = Replica(
EdgeReplicaA,
2,
// Note: there is no way to actively connect to this replica, instead the GrpcClientSettings would be how _it_ connects
// (to DCA in this case). The normal replicas does not have the Replica in their lists of all replicas
replicaA.grpcClientSettings)
*/

private val testKitsPerDc =
Map(DCA -> testKit, DCB -> ActorTestKit(systems(1)), EdgeReplicaA -> ActorTestKit(systems(2)))
private val systemPerDc = Map(DCA -> system, DCB -> systems(1), EdgeReplicaA -> systems(2))
private var replicationA: Replication[HelloWorld.Command] = _
private var replicationB: Replication[HelloWorld.Command] = _
private var edgeReplicationA: EdgeReplication[HelloWorld.Command] = _
private val entityIdOne = "one"
private val entityIdTwo = "two"

override protected def beforeAll(): Unit = {
super.beforeAll()
systemPerDc.values.foreach(beforeAllDeleteFromTables)
}

def startReplica(replicaSystem: ActorSystem[_], replica: Replica): Future[Replication[HelloWorld.Command]] = {
logger
.infoN(
"Starting replica [{}], system [{}] on port [{}]",
replica.replicaId,
replicaSystem.name,
replica.grpcClientSettings.defaultPort)

val grpcPort = replica.grpcClientSettings.defaultPort
val settings = ReplicationSettings[HelloWorld.Command](
HelloWorld.EntityType,
replica.replicaId,
EventProducerSettings(replicaSystem),
allCloudReplicas,
10.seconds,
8,
R2dbcReplication()).withEdgeReplication(true)
val started =
Replication.grpcReplication(settings)(HelloWorld.replicated)(replicaSystem)

// start producer server
Http()(system)
.newServerAt("127.0.0.1", grpcPort)
.bind(started.createSingleServiceHandler())
.map(_.addToCoordinatedShutdown(3.seconds)(system))(system.executionContext)
.map(_ => started)
}

def startEdgeReplica(
replicaSystem: ActorSystem[_],
selfReplicaId: ReplicaId,
connectTo: Replica): EdgeReplication[HelloWorld.Command] = {
val settings = ReplicationSettings[HelloWorld.Command](
HelloWorld.EntityType,
selfReplicaId,
EventProducerSettings(replicaSystem),
Set(connectTo),
10.seconds,
// few on edge node (but 2 rather than 1 here to make sure test actually covers parallel updates)
2,
R2dbcReplication()).withEdgeReplication(true)
Replication.grpcEdgeReplication(settings)(HelloWorld.replicated)(replicaSystem)
}

"Replication over gRPC" should {
"form one edge cluster" in {
// Probably not realistic to start with non-replicated at edge, and then make it replicated,
// but that will test the push aspect, which is most different compared to ReplicationMigrationIntegrationSpec
val testKit = testKitsPerDc(EdgeReplicaA)
val cluster = Cluster(testKit.system)
cluster.manager ! Join(cluster.selfMember.address)
testKit.createTestProbe().awaitAssert {
cluster.selfMember.status should ===(MemberStatus.Up)
}
}

"persist events with non-replicated EventSourcedBehavior" in {
val testKit = testKitsPerDc(EdgeReplicaA)
Set(entityIdOne, entityIdTwo).foreach { entityId =>
withClue(s"for entity id $entityId") {
val entity = testKit.spawn(HelloWorld.nonReplicated(PersistenceId.of(HelloWorld.EntityType.name, entityId)))

import akka.actor.typed.scaladsl.AskPattern._
entity.ask(HelloWorld.SetGreeting("hello 1 from non-replicated", _)).futureValue
entity.ask(HelloWorld.SetGreeting("hello 2 from non-replicated", _)).futureValue
testKit.stop(entity)
}
}
}

"form three one node clusters" in {
testKitsPerDc.foreach {
case (dc, testKit) =>
if (dc != EdgeReplicaA) {
val cluster = Cluster(testKit.system)
cluster.manager ! Join(cluster.selfMember.address)
testKit.createTestProbe().awaitAssert {
cluster.selfMember.status should ===(MemberStatus.Up)
}
}
}
}

"start two cloud replicas and one edge replica" in {
replicationA = startReplica(systemPerDc(DCA), replicaA).futureValue
replicationB = startReplica(systemPerDc(DCB), replicaB).futureValue
edgeReplicationA = startEdgeReplica(systemPerDc(EdgeReplicaA), EdgeReplicaA, replicaA)
logger.info("All three replication/producer services bound")
}

"recover from non-replicated events" in {
testKitsPerDc.values.foreach { testKit =>
withClue(s"on ${testKit.system.name}") {
val probe = testKit.createTestProbe()

Set(entityIdOne, entityIdTwo).foreach { entityId =>
withClue(s"for entity id $entityId") {
val entityRef = ClusterSharding(testKit.system)
.entityRefFor(HelloWorld.EntityType, entityId)

probe.awaitAssert({
entityRef
.ask(HelloWorld.Get.apply)
.futureValue should ===("hello 2 from non-replicated")
}, 10.seconds)
}
}
}
}
}

"replicate writes directly from cloud to edge" in {
for {
n <- 1 to 5
entityId <- Set(entityIdOne, entityIdTwo)
} {
logger.infoN("Updating greeting {} for [{}] from dc [{}]", n, entityId, DCA)
replicationA
.entityRefFactory(entityId)
.ask(HelloWorld.SetGreeting(s"hello $n from ${DCA.id}", _))
.futureValue

val edgeEntityRef = edgeReplicationA.entityRefFactory(entityId)
val probe = testKit.createTestProbe()
probe.awaitAssert({
edgeEntityRef
.ask(HelloWorld.Get.apply)
.futureValue should ===(s"hello $n from ${DCA.id}")
}, 10.seconds)

// and also B ofc (unrelated to edge replication but for good measure)
val dcBEntityRef = replicationB.entityRefFactory(entityId)
probe.awaitAssert({
dcBEntityRef
.ask(HelloWorld.Get.apply)
.futureValue should ===(s"hello $n from ${DCA.id}")
}, 10.seconds)
}
}

"replicate writes from edge node to cloud" in {
for {
n <- 6 to 10
entityId <- Set(entityIdOne, entityIdTwo)
} {
logger.infoN("Updating greeting {} for [{}] from edge dc", n, entityId)
edgeReplicationA
.entityRefFactory(entityId)
.ask(HelloWorld.SetGreeting(s"hello $n from edge", _))
.futureValue

val probe = testKit.createTestProbe()
// should reach the direct replica
val dcAEntityRef = replicationA.entityRefFactory(entityId)
probe.awaitAssert({
dcAEntityRef
.ask(HelloWorld.Get.apply)
.futureValue should ===(s"hello $n from edge")
}, 10.seconds)

// then indirectly replica B
val dcBEntityRef = replicationB.entityRefFactory(entityId)
probe.awaitAssert({
dcBEntityRef
.ask(HelloWorld.Get.apply)
.futureValue should ===(s"hello $n from edge")
}, 10.seconds)
}
}
}

protected override def afterAll(): Unit = {
logger.info("Shutting down all three DCs")
systems.foreach(_.terminate()) // speed up termination by terminating all at once
// and then make sure they are completely shutdown
systems.foreach { system =>
ActorTestKit.shutdown(system)
}
super.afterAll()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ object ReplicationIntegrationSpec {

object State {
val initial =
State("Hello world", LwwTime(Long.MinValue, ReplicaId("")), "", LwwTime(Long.MinValue, ReplicaId("")))
State("Hello world", LwwTime(Long.MinValue, ReplicaId.empty), "", LwwTime(Long.MinValue, ReplicaId.empty))
}

case class State(greeting: String, greetingTimestamp: LwwTime, tag: String, tagTimestamp: LwwTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ object ReplicationJavaDSLIntegrationSpec {
final case class TagChanged(tag: String, timestamp: LwwTime) extends Event

object State {
val initial = State("Hello world", LwwTime(Long.MinValue, ReplicaId("")), "")
val initial = State("Hello world", LwwTime(Long.MinValue, ReplicaId.empty), "")
}

case class State(greeting: String, timestamp: LwwTime, tag: String)
Expand Down
Loading
Loading