Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Step 2: Add Akka Cluster Sharding features #2

Open
wants to merge 10 commits into
base: feature/1-akka-cluster-singleton
Choose a base branch
from
Open
3 changes: 1 addition & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ libraryDependencies ++= {
val logBackVersion = "1.2.3"
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
"com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"de.heikoseeberger" %% "akka-http-json4s" % akkaHttpJson4sVersion,
Expand Down
34 changes: 13 additions & 21 deletions src/main/scala/HttpServerApp.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import akka.actor.{ActorSystem, PoisonPill}
import akka.cluster.singleton.{ClusterSingletonManager, ClusterSingletonManagerSettings, ClusterSingletonProxy, ClusterSingletonProxySettings}
import akka.actor.ActorSystem
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import akka.event.Logging
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives
Expand All @@ -8,7 +8,7 @@ import api.{CustomExceptionHandling, EventHttpEndpoint}
import com.typesafe.config.{Config, ConfigFactory}
import persistence.EventRepository
import reactivemongo.api.{MongoConnection, MongoDriver}
import services.{EventManager, TicketSellerSupervisor}
import services.{EventManager, TicketSeller}

import scala.concurrent.Await
import scala.concurrent.duration._
Expand Down Expand Up @@ -45,24 +45,16 @@ trait PersistenceModule { self: AkkaModule with SettingsModule =>
}

trait ServicesModule { self: AkkaModule with PersistenceModule =>
// Initiates singleton TicketSellerSupervisor in the Cluster
val ticketSellerSupervisorSingleton =
system.actorOf(
ClusterSingletonManager.props(
singletonProps = TicketSellerSupervisor.props(),
terminationMessage = PoisonPill,
settings = ClusterSingletonManagerSettings(system).withSingletonName(s"${TicketSellerSupervisor.Name}-singleton")),
name = s"${TicketSellerSupervisor.Name}-singleton-manager")

// Initiates proxy for singleton
val ticketSellerSupervisorSingletonProxy =
system.actorOf(
ClusterSingletonProxy.props(
singletonManagerPath = ticketSellerSupervisorSingleton.path.toStringWithoutAddress,
settings = ClusterSingletonProxySettings(system).withSingletonName(s"${TicketSellerSupervisor.Name}-singleton")),
name = s"${TicketSellerSupervisor.Name}-singleton-proxy")

val eventManager = system.actorOf(EventManager.props(eventRepository, ticketSellerSupervisorSingletonProxy), EventManager.Name)
// Initiates ShardRegion for TicketSeller
val ticketSellerShardRegion =
ClusterSharding(system).start(
typeName = TicketSeller.Name,
entityProps = TicketSeller.props(),
settings = ClusterShardingSettings(system),
extractEntityId = TicketSeller.Sharding.extractEntityId,
extractShardId = TicketSeller.Sharding.extractShardId)

val eventManager = system.actorOf(EventManager.props(eventRepository, ticketSellerShardRegion), EventManager.Name)
}

trait EndpointsModule { self: AkkaModule with ServicesModule =>
Expand Down
11 changes: 11 additions & 0 deletions src/main/scala/services/TicketSeller.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package services

import akka.actor.{Actor, FSM, Props}
import akka.cluster.sharding.ShardRegion
import domain.{Ticket, _}

object TicketSeller {
Expand Down Expand Up @@ -37,6 +38,16 @@ object TicketSeller {
}
}

object Sharding {
val extractEntityId: ShardRegion.ExtractEntityId = {
case msg @ EventMessage(jobId, _) => (jobId.toString, msg)
}

val extractShardId: ShardRegion.ExtractShardId = {
case EventMessage(jobId, _) => (math.abs(jobId.toString.hashCode) % 100).toString
}
}

}

class TicketSeller extends Actor with FSM[TicketSeller.State, TicketSeller.BoxOffice] {
Expand Down
27 changes: 0 additions & 27 deletions src/main/scala/services/TicketSellerSupervisor.scala

This file was deleted.