30 April 2014

Reactive DDD with Akka - lesson 2

Don't miss: ddd-leaven-akka-v2 - runnable sample e-commerce application built from Akka-DDD artifacts described in this series.

Recap of lesson 1


In previous lesson we have managed to implement abstraction of event sourced Aggregate Root (AR) using akka-persistence and use it to develop concrete AR class representing Reservation process. We have shown that AR is message driven component that reacts on command messages. If command (i.e ReserveProduct) is accepted AR produces event message representing this fact (i.e ProductReserved) and once the event is persisted AR sends back acknowledgment message and propagates the event to outside world. If command is rejected, AR simply responds with failure message. All messaging happens asynchronously.

Inside AR we have separated concept of AR's state (AggregateState) and concept of AR's state factory (AggregateRootFactory), both modeled as state machines that compute next state based on the current state and incoming event.

The separation of AR actor and its internal state allows looking at AR creation from two different perspective. First one is domain-centric: AggregateRootFactory encapsulates domain logic only. Second one is technical: AR is an actor that lives somewhere in actor system, thus, for example, it should be provided references to other actors it needs to communicate with. But how actually AR actor is created? Apparently, we have not touched this topic in lesson 1. It's time to get it cleared. The complete code for lesson 2 is available here

Aggregate Root actor creation


In traditional DDD/CQRS architecture, command handler is responsible for creating AR and adding it to repository. When handling subsequent commands, command handler is instantiating the AR by fetching it from repository. With our new architecture we need to rethink these patterns. First of all we don't need another command handling component since AR actor is already a command handler. Secondly, repository of actors does not seem to be a good abstraction. Actors are being created, started, killed, restarted, but not added/loaded to/from repository. In Akka, they are created and supervised by their parents. Let's see how client's interaction with Reservation process can be simplified if we make use of actors supervision pattern.

Don't call me, call my office


Going back to our sales domain and reservation process from lesson 1, each client, before creating a reservation, first had to obtain reference to actor instance (reservation actor) responsible for handling this particular reservation process. We created a method getReservationActor for that purpose:


From the client perspective this pattern is more complicated than it should be. The client just wants to send a command to an actor that knows how to deal with it. He should not be troubled with finding an actor he can talk to. I think it's pretty obvious now that we need a single actor serving all reservation related requests from the clients. The actor should hide the logic of passing messages between clients and reservation actors which means he should be able to manage reservation actors (create them and kill them on demand or in case of failure). In Akka this kind of relationship is called parental supervision. All actors in Akka (excluding internal top-level actors) are created by other actors (their parents) that become their supervisors.
To introduce just discovered pattern to our reservation domain we need to find some friendly and meaningful name for it. The client should not be contacted with "supervising parent of Reservation actors" but rather he should be talking to reservation office.

Office and clerks

Reservation office takes care of handling reservation requests from the clients. The actual job is performed by clerks. Clerks are working on cases they are assigned to (where case id is aggregate id, example: "reservation-1"). AR actor is thus a clerk with assigned case.
I find this analogy quite useful but if you have a better one just drop me a comment.

Implementation of Office actor class should be generic, not tied to particular domain. When request (command) comes in, office should resolve/extract case id from the command and either find a clerk that is currently working on this case or (if there is no such clerk) create new clerk and assign him the case (this translates to creating an child actor with name equal to case id). The problem is that generic office needs to resolve case identifier from command that can be of any type. To make this possible first we need to make Office class parameterized (Office[T <: AggregateRoot]) and then apply type class pattern to inject function Command => AggregateId (AggregateIdResolver) via implicit constructor parameter into concrete Office instance. In other words, creation of office for any class of Aggregate is possible assuming there is implicitly available implementation of AggregateIdResolver for that class.

With the following declaration AggregateIdResolver for Reservation becomes available:


Recipe for actor

Lets talk about actor creation in details. Actors in Akka are created from recipes (Props class). Recipes can be created in different ways, for example by providing class of an actor to be created and its constructor parameters (if any). This way generic office actor can construct recipe of the actor to be created if it knows its class and constructor parameters.

Now we can complete implementation of office actor. After obtaining reference to clerk actor, office actor must just forward the command to the clerk. Please see code of the Office class below:


We need yet helper function that creates office as top-level actor:


Finally the client can send reservation commands to reservation office instead of individual clerk:


Firing of clerks 

The question arises what happens to clerk actors after they finish processing the command(s) and become idle. Well, nothing, they live (in memory) until they are stopped. Thus, we should take care of dismissing clerks that are idle. This can be easily achieved by defining inactivity timeout after which the clerk is notified with ReceiveTimeout message. Being notified, clerk should dismiss (stop) himself or, to avoid dropping commands just enqueued in his inbox, ask office to dismiss him gracefully. This pattern/protocol is called graceful passivation. Because all Aggregate Roots should support this pattern, AggregateRoot class has been enriched with GracefulPassivation trait. Currently Office class does not fully implement graceful passivation of clerks (clerks are stopped, but enqueued messages might be lost) but don't worry, Akka provides much more robust implementation of office pattern: akka-sharding. Akka-sharding does not only supports graceful passivation, but first of all allows office to work in distributed environment.


Global office


Single office is limited by available resources (cpu, memory) so it can handle limited number of requests concurrently. Logical global office consisting of branch offices distributed among different locations will perform much better under high workload. Each branch office should be delegated to work on subset of cases. This is the idea of sharding where branch office is a shard. Akka-sharding takes care of distributing shards among available nodes in the cluster according to given shard allocation strategy. Default strategy allocates new shard to node with least number of allocated shards. The intention is to make workload distributed evenly across all nodes in the cluster. If new nodes are added to the cluster, shards are being rebalanced. Because shards are not tied to single machine, state of the actors needs to be distributed as well. This means akka-persistence must be configured with a distributed journal.

Let's see how global reservation office can be created in Akka using akka-sharding.

Testing global reservation office

Thanks to sbt-multi-jvm plug-in and MultiNodoSpec writing cluster-aware tests in Akka is surprisingly easy. We will not dive into details of cluster configuration but concentrate on sharding. ReservationGlobalOfficeSpec is the test we are going to discuss now. As you can see, at startup, shared journal is configured and cluster is started (consisting of two nodes). Please note that sbt-multi-jvm plug-in will automatically execute the same test on each node concurrently.

The following method is executed on each node to start sharding:


startSharding is a method that actually starts sharding by invoking


Akka-sharding requires IdExtractor and ShardResolver to be defined for AR that is going to be shared. Logic of IdExtractor we have already encapsulated in AggregateIdResolution. Now we will introduce abstract class ShardResolution that extends AggregateIdResolution and defines shardResolver that extracts shard id from the command. This logic is implemented as composition of shardResolutionStrategy and aggregateIdResolver:


Default shard resolution strategy (available in ShardResolution companion object) returns first char from hexadecimal hashcode generated from aggregateId. This means up to 16 shards can be created (0-9 and A-F) (see http://guide.couchdb.org/draft/clustering.html#hashing to learn more about consistent hashing in context of sharding).

Shard resolution strategy in the test however (see code above) is different. It takes just last character from aggregateId. Thus in the first test we expect two reservations "reservation-1" and "reservation-2" being assigned to clerks working on different nodes:


The instance of reservation office is not our previously implemented Office actor but actor created by akka-sharding (helper method globalOffice takes care of that). During the test two requests for creating reservations "reservation-1" and "reservation-2" are sent from node 1. We expect that exactly one request will be processed on both nodes. After both requests are processed, we verify if subsequent requests are correctly processed if sent from a different node than creation requests:


With this test we complete lesson 2.

In next lesson we will try to build views so that our application  can respond to queries.

14 April 2014

Reactive DDD with Akka

Don't miss: ddd-leaven-akka-v2 - runnable sample e-commerce application built from Akka-DDD artifacts described in this series.

When comparing traditional and DDD/CQRS architectures (see my previous article) I said that the goal of DDD model is to decompose complex business domain into manageable pieces taking into account scalability and consistency requirements. What it means is that by bringing concepts like bounded contexts, transaction boundaries and event based communication DDD and CQRS are great enablers for building scalable software. But so far, example services I have presented, were supposed to be run on top of relational database and within global transactions. This is very limiting architecture not suited for building scalable software. Continuous consistency of underlying data that is guaranteed by global transactions should not be perceived as standard requirement of any (including enterprise-class) system. It's an artificial requirement that we all got used to but does not addresses real requirements of the customer. To fully benefit from DDD/CQRS architecture we should change the underlying technology. Today we have a choice. There is a lot of NoSQL databases and there are a few platforms that address scalability as first concern. For JVM, Akka (part of Typesafe platform) is the most robust open-sourced platform for building event-driven applications. Recently akka-persistence module has been released that takes care of handling long-running/persistable processes (this is what Aggregate Roots and Sagas are all about). This is a great feature that allows thinking of Akka as complete platform for building enterprise applications.

Lets then start building event-driven, scalable, resilient and responsive (in short reactive) application using Akka and other goodies from Typesafe platform.
I have already started a project on Github. You are welcome to contribute!

Below is the first lesson I learned from the project and wanted to share it with you. Hopefully more lessons will come (see open issues).

Lesson 1 - Aggregate root is an actor

The source code for lesson 1 is available here: https://github.com/pawelkaczor/ddd-leaven-akka/tree/Lesson1

The goal of lesson 1 is to learn how to build event sourced Aggregate Root (AR) with Akka.
The idea is simple. Aggregate Root should be modeled as stateful actor that accepts Commands and produces Events. Because actor is message driven, we can send Command messages directly to Aggregate Root avoiding "command to method call" transformation.

As already mentioned akka-persistence provides necessary artifacts for building persistable/stateful actors. The key component is akka.persistence.Processor trait. Processor is an actor capable of restoring its state (aka recovering) during reincarnation (start or restart of the actor). The type of underlying storage is append-only pluggable journal.

Command sourcing

Any message of type Persistent that comes to a processor is stored in a journal before it is processed. During recovery, persistent messages are replayed to the processor so that it can restore internal state from these messages.
This pattern (called Command sourcing) is not particularly applicable for Aggregate Roots because replying of command that has not yet been validated is not desired.

Event sourcing

To build AR we need to extend from EventsourcedProcessor that adds event sourcing capability (Eventsourced trait) to Processor trait - only produced events will be stored in the journal. This means we need to explicitly invoke persist(event) method of Eventsourced trait to store produced event in the journal after command message has been validated (by validation I mean ensuring AR's invariants will not be compromised by the command). Since persist method persists events asynchronously (does not block the current thread) it accepts a callback (event handler) as the second argument. Main responsibility of event sourced AR is to provide event handler that will update internal state of AR and handle the event by publishing it and/or sending a response to the client/command sender. Handling of event should be customizable.

AggregateRoot trait

Let's see how to build abstract event sourced AggregateRoot class.

Abstract AggregateRoot keeps state using private variable member of type AggregateState (abstract) and takes care of updating this variable whenever an event is produced/raised (raise method) or replayed (receiveRecovery method). State itself (concrete implementation of AbstractState) should be immutable class implementing method apply that defines state transitions for each event (except initialization). Initialization of the state is performed by AggregateRootFactory - the abstract member of AR that must be overridden in concrete implementation of AR. Initialization is event-driven as well which means that AggregateRootFactory creates initial state from an event. To complete the picture, the raise(event) method calls persist method and, after event is persisted, it either calls default handler or handler provided as the second (optional) argument of the raise method. Default handler publishes an event to event bus (provided by Akka) and sends Acknowledged message back to the sender.

Reservation AR

Please take a look at implementation of concrete Aggregate Root (Reservation). The code should be self explanatory. Command processing consists of validation and raising an event.

ReservationSpec verifies if Reservation AR is in fact stateful component, capable of handling reservation process. The test just simply sends several commands to Reservation AR in valid order and verifies if expected events have been persisted. In the middle of the process Reservation actor is restarted to verify if it preserves the state. And in fact it is since subsequent commands are handled successfully.

Errors handling

By default if any exception of type java.lang.Exception is thrown by the actor the actor is restarted by its supervisor (this is defined in default SupervisionStrategy). Exceptions are not propagated to the command sender automatically as you might expect. We can either catch exception and send them back to the sender from within receiveCommand method or send the exception from within preRestart method that takes exception as reason argument. Overriding preRestart method seems to be a simpler approach. Now we can test if exceptions are returned to the sender: ReservationFailuresSpec

In next lesson...

Currently the client needs to get a reference to particular instance of Aggregate Root before sending the command. It would be much easier for him if he could just send the command to some command gateway. This will be the topic of the next lesson.