Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Step 3: Add Akka Persistence features #3

Open
wants to merge 16 commits into
base: feature/2-akka-cluster-sharding
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ scalaVersion := "2.12.2"

libraryDependencies ++= {
val akkaVersion = "2.5.3"
val akkaPersistenceCassandra = "0.54"
val akkaHttpVersion = "10.0.7"
val akkaHttpJson4sVersion = "1.11.0"
val json4sVersion = "3.5.0"
Expand All @@ -14,6 +15,8 @@ libraryDependencies ++= {
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence-cassandra" % akkaPersistenceCassandra,
"com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"de.heikoseeberger" %% "akka-http-json4s" % akkaHttpJson4sVersion,
Expand Down
13 changes: 13 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ akka {
auto-down-unreachable-after = 10s
}

persistence {
journal {
plugin = "cassandra-journal"
auto-start-journals = ["cassandra-journal"]
}
snapshot-store {
plugin = "cassandra-snapshot-store"
auto-start-snapshot-stores = ["cassandra-snapshot-store"]
}
}

extensions = [akka.persistence.Persistence]

}

mongodb {
Expand Down
63 changes: 51 additions & 12 deletions src/main/scala/services/TicketSeller.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package services

import akka.actor.{Actor, FSM, Props}
import akka.actor.{Actor, Props}
import akka.cluster.sharding.ShardRegion
import akka.persistence.fsm.PersistentFSM
import akka.persistence.fsm.PersistentFSM.FSMState
import domain.{Ticket, _}

import scala.reflect.{ClassTag, classTag}

object TicketSeller {
val Name = "ticket-seller"

Expand All @@ -12,10 +16,19 @@ object TicketSeller {
}

// States
sealed trait State
case object Idle extends State
case object Active extends State
case object SoldOut extends State
sealed trait State extends FSMState

case object Idle extends State {
override def identifier: String = "idle"
}

case object Active extends State {
override def identifier: String = "active"
}

case object SoldOut extends State {
override def identifier: String = "sold-out"
}

// Data
sealed trait BoxOffice {
Expand All @@ -38,6 +51,11 @@ object TicketSeller {
}
}

// Events
sealed trait DomainEvent
case class TicketsAdded(tickets: Seq[Ticket]) extends DomainEvent
case object TicketBought extends DomainEvent

object Sharding {
val extractEntityId: ShardRegion.ExtractEntityId = {
case msg @ EventMessage(jobId, _) => (jobId.toString, msg)
Expand All @@ -50,27 +68,50 @@ object TicketSeller {

}

class TicketSeller extends Actor with FSM[TicketSeller.State, TicketSeller.BoxOffice] {
class TicketSeller extends Actor with PersistentFSM[TicketSeller.State, TicketSeller.BoxOffice, TicketSeller.DomainEvent] {

import TicketSeller._

log.info("Starting TicketSeller at {}", self.path)

override def persistenceId: String = {
// Note:
// self.path.parent.parent.name is the ShardRegion actor name: ticket-seller
// self.path.parent.name is the Shard supervisor actor name: 5
// self.path.name is the sharded Entity actor name: 597be7e24e00004500292035
s"${self.path.parent.parent.name}-${self.path.parent.name}-${self.path.name}"
}

override def domainEventClassTag: ClassTag[DomainEvent] = classTag[DomainEvent]

override def applyEvent(domainEvent: DomainEvent, boxOfficeBeforeEvent: BoxOffice): BoxOffice = {
domainEvent match {
case TicketsAdded(tickets) => NonEmptyBoxOffice(tickets)

case TicketBought =>
val (_, newBoxOffice) = boxOfficeBeforeEvent.buy()
newBoxOffice
}
}

startWith(Idle, EmptyBoxOffice)

when(Idle) {
case Event(EventMessage(_, AddTickets(tickets)), _) => {
goto(Active) using NonEmptyBoxOffice(tickets)
goto(Active) applying TicketsAdded(tickets)
}
}

when(Active) {
case Event(EventMessage(_, BuyTicket), boxOffice: BoxOffice) => {
val (boughtTicket, newBoxOffice) = boxOffice.buy()
sender ! boughtTicket
newBoxOffice match {
case bo: NonEmptyBoxOffice => stay using bo
case EmptyBoxOffice => goto(SoldOut) using EmptyBoxOffice
case _: NonEmptyBoxOffice => stay applying TicketBought andThen { _ =>
sender ! boughtTicket // respond to the sender once the event has been persisted successfully
}
case EmptyBoxOffice => goto(SoldOut) applying TicketBought andThen { _ =>
sender ! boughtTicket // respond to the sender once the event has been persisted successfully
}
}
}
}
Expand Down Expand Up @@ -101,6 +142,4 @@ class TicketSeller extends Actor with FSM[TicketSeller.State, TicketSeller.BoxOf
}
}

initialize()

}