Don't miss: ddd-leaven-akka-v2 - runnable sample e-commerce application built from Akka-DDD artifacts described in this series.
Recap of lesson 1
Inside AR we have separated concept of AR's state (AggregateState) and concept of AR's state factory (AggregateRootFactory), both modeled as state machines that compute next state based on the current state and incoming event.
The separation of AR actor and its internal state allows looking at AR creation from two different perspective. First one is domain-centric: AggregateRootFactory encapsulates domain logic only. Second one is technical: AR is an actor that lives somewhere in actor system, thus, for example, it should be provided references to other actors it needs to communicate with. But how actually AR actor is created? Apparently, we have not touched this topic in lesson 1. It's time to get it cleared. The complete code for lesson 2 is available here
Aggregate Root actor creation
Don't call me, call my office
From the client perspective this pattern is more complicated than it should be. The client just wants to send a command to an actor that knows how to deal with it. He should not be troubled with finding an actor he can talk to. I think it's pretty obvious now that we need a single actor serving all reservation related requests from the clients. The actor should hide the logic of passing messages between clients and reservation actors which means he should be able to manage reservation actors (create them and kill them on demand or in case of failure). In Akka this kind of relationship is called parental supervision. All actors in Akka (excluding internal top-level actors) are created by other actors (their parents) that become their supervisors.
To introduce just discovered pattern to our reservation domain we need to find some friendly and meaningful name for it. The client should not be contacted with "supervising parent of Reservation actors" but rather he should be talking to reservation office.
Office and clerks
Reservation office takes care of handling reservation requests from the clients. The actual job is performed by clerks. Clerks are working on cases they are assigned to (where case id is aggregate id, example: "reservation-1"). AR actor is thus a clerk with assigned case.I find this analogy quite useful but if you have a better one just drop me a comment.
Implementation of Office actor class should be generic, not tied to particular domain. When request (command) comes in, office should resolve/extract case id from the command and either find a clerk that is currently working on this case or (if there is no such clerk) create new clerk and assign him the case (this translates to creating an child actor with name equal to case id). The problem is that generic office needs to resolve case identifier from command that can be of any type. To make this possible first we need to make Office class parameterized (Office[T <: AggregateRoot]) and then apply type class pattern to inject function Command => AggregateId (AggregateIdResolver) via implicit constructor parameter into concrete Office instance. In other words, creation of office for any class of Aggregate is possible assuming there is implicitly available implementation of AggregateIdResolver for that class.
With the following declaration AggregateIdResolver for Reservation becomes available:
Recipe for actor
Lets talk about actor creation in details. Actors in Akka are created from recipes (Props class). Recipes can be created in different ways, for example by providing class of an actor to be created and its constructor parameters (if any). This way generic office actor can construct recipe of the actor to be created if it knows its class and constructor parameters.Now we can complete implementation of office actor. After obtaining reference to clerk actor, office actor must just forward the command to the clerk. Please see code of the Office class below:
We need yet helper function that creates office as top-level actor:
Finally the client can send reservation commands to reservation office instead of individual clerk:
Firing of clerks
The question arises what happens to clerk actors after they finish processing the command(s) and become idle. Well, nothing, they live (in memory) until they are stopped. Thus, we should take care of dismissing clerks that are idle. This can be easily achieved by defining inactivity timeout after which the clerk is notified with ReceiveTimeout message. Being notified, clerk should dismiss (stop) himself or, to avoid dropping commands just enqueued in his inbox, ask office to dismiss him gracefully. This pattern/protocol is called graceful passivation. Because all Aggregate Roots should support this pattern, AggregateRoot class has been enriched with GracefulPassivation trait. Currently Office class does not fully implement graceful passivation of clerks (clerks are stopped, but enqueued messages might be lost) but don't worry, Akka provides much more robust implementation of office pattern: akka-sharding. Akka-sharding does not only supports graceful passivation, but first of all allows office to work in distributed environment.
Global office
Let's see how global reservation office can be created in Akka using akka-sharding.
Testing global reservation office
Thanks to sbt-multi-jvm plug-in and MultiNodoSpec writing cluster-aware tests in Akka is surprisingly easy. We will not dive into details of cluster configuration but concentrate on sharding. ReservationGlobalOfficeSpec is the test we are going to discuss now. As you can see, at startup, shared journal is configured and cluster is started (consisting of two nodes). Please note that sbt-multi-jvm plug-in will automatically execute the same test on each node concurrently.The following method is executed on each node to start sharding:
startSharding is a method that actually starts sharding by invoking
Akka-sharding requires IdExtractor and ShardResolver to be defined for AR that is going to be shared. Logic of IdExtractor we have already encapsulated in AggregateIdResolution. Now we will introduce abstract class ShardResolution that extends AggregateIdResolution and defines shardResolver that extracts shard id from the command. This logic is implemented as composition of shardResolutionStrategy and aggregateIdResolver:
Default shard resolution strategy (available in ShardResolution companion object) returns first char from hexadecimal hashcode generated from aggregateId. This means up to 16 shards can be created (0-9 and A-F) (see http://guide.couchdb.org/draft/clustering.html#hashing to learn more about consistent hashing in context of sharding).
Shard resolution strategy in the test however (see code above) is different. It takes just last character from aggregateId. Thus in the first test we expect two reservations "reservation-1" and "reservation-2" being assigned to clerks working on different nodes:
The instance of reservation office is not our previously implemented Office actor but actor created by akka-sharding (helper method globalOffice takes care of that). During the test two requests for creating reservations "reservation-1" and "reservation-2" are sent from node 1. We expect that exactly one request will be processed on both nodes. After both requests are processed, we verify if subsequent requests are correctly processed if sent from a different node than creation requests:
With this test we complete lesson 2.
In next lesson we will try to build views so that our application can respond to queries.
Hi,
ReplyDeleteI had some problem with expectEventPersisted method.
Apparently its possible that "when" is executed before EventFilter is started.
Result is that i have message Event persisted in log, then test crashes due to timeout.
This happens "most of the time", it sometimes passes during debugging.
Thanks for reporting. The bug is fixed now ('when' parameter should be passed 'by name', not 'by value'' to expectEventPersisted)
ReplyDeleteThank you :)
DeleteCant wait for next lesson, great stuff !
I found that in current solution it's not possible to use Views.
ReplyDeleteAggregateRoot shout override processorId with clerk name.
Regards ;)
Fixed. processorId returns aggregateId by default now. Thanks again for reporting :)
ReplyDeletePleasure is all mine :)
ReplyDeleteCould you please shed light on where are you going with trait ReliablePublisher?
As far as i can tell its purpose is for domain events to be "transported" to some actor, but which ?
Is there a reason to pick this approach instead of EventBus (ex. with sub channel classification) ?
Is it essential for the events to be published during AR recovery?
Regards :)
ReliablePublisher guarantees that event will be published (delivered) to provided destination actor at-least-once (the default delivery semantics in akka is at-most-once). Destination actor must confirm (acknowledge) received message. Nothing prevents destination actor from publishing to EventBus (see: ProductPublishingSpec/ EventLocalConfirmablePublisher)
DeleteFor more details you can check official docs: http://doc.akka.io/docs/akka/snapshot/scala/persistence.html#Reliable_event_delivery
Hopefully I will publish next lesson soon that will cover this topic as well.
Hi , very nice post .
ReplyDeleteI know it is out of scope but I kindly would like to consult with you. I am using a sort of scatter gather pattern to fetch data. each actor should fetch part of data according to param (e.d customer ID) . and an actor that aggregates the results applying some business logic and stores the result in the db .
so what is the best practice to achieve that ? should I send the repository to each actor constructor (e.g class FooService(invoicesRepo:InvoicesRepository) extends Actor ....
and while testing to send mock[InvoicesRepository] )
? or using cake pattern or other abstraction ?
any links or examples will be helpful