11 June 2014

Reactive DDD with Akka - lesson 3 (Projections)

Don't miss: ddd-leaven-akka-v2 - runnable sample e-commerce application built from Akka-DDD artifacts described in this series.

Let's continue our adventure with Akka and DDD. So far we have been concentrated on write/command side of the system. We have covered topics like command processing, event storage, sharding etc. but haven't touched the other side of the system: the query side. Our goal of this lesson is to implement necessary infrastructure supporting creation of components responsible for feeding query-side database: projections. For query side we will choose sql database although there are other technologies worth considering (document or graph databases might be a better choice depending on requirements).

Our first task is to check if Akka (akka-persistence in current, still experimental version to be precise) supports concept of projections. And as it turns out, Akka provides View trait that defines an actor capable of receiving events from journal of particular processor (aggregate root - AR) instance. This sounds good but unfortunately the concept of View is insufficient for building projections that typically listen to aggregated stream of events rather then events of particular AR instance. A workaround would be to register a view for a processor that receives events from all ARs of given type, but in such scenario, events would be persisted twice and reliable event delivery (more on that later) would had to be ensured between ARs and aggregating processor.

What we should require from "ideal" event store implementation is a dsl for event streams transformation and aggregation.

EventStore should be mentioned here as it provides impressive set of methods for handling streams of events: Event Store as a Read Model including stream aggregation (per category/AR type) . You can configure EventStore Journal for Akka Persistence and try experiment with projections (be prepared to write some javascript and json).

But what if we don't want to depend on any particular journal provider? Well, composing streams of events would be possible if journals were exposed through some functional (composable) interface rather than View. And this is one of the goals already established by akka team with introduction of akka-stream module. Akka-stream is implementation of recently announced standard for asynchronous, composable and non-blocking stream processing on JVM: http://www.reactive-streams.org/ that models stream publishers and subscribers (as defined by the specification) as akka actors. It also already provides simple dsl (PersistentFlow) for creating event stream publishers that are backed by Views. As you can see here  event streams can be aggregated by merging stream publishers (although aggregation by category is not yet supported). Once the akka-streams is released we should definitely make use of it and see how to implement projections as reactive components (subscribers of event streams). For now we need to find a different solution. The only possibility left is to introduce third-party broker and forward events to topics or queues configured within the broker. As we will see in following section, integrating akka with external broker is much simpler that one could imagine. This approach will require enriching AR with Publisher component (trait) responsible for sending persisted events to configured target actor. This is a perfect opportunity to learn more about message delivery in distributed systems.

Reliable event delivery

Sending events generated by AR to any other actor (being a broker gateway or aggregating processor (already mentioned before)) should be straightforward to implement, right? Yes, it is, if you accept that in case of node crash some messages might be lost (despite the fact that corresponding commands had been acknowledged to the client!). Akka does not guarantee that message will be delivered (this is so called at-most-once delivery semantics) because reliable delivery does not come for free in distributed systems. To obtain at-least-once delivery semantics (message is guaranteed to be delivered, but duplicates may occur) acknowledgment protocol must be used. Sender must store the message before sending and receiver must acknowledge the message once it is received.  If acknowledgment is received, sender must mark previously stored message as confirmed. If acknowledgment is not received within configured time period or sender is restarted (for example as the result of crash) sender retries delivery. Because acknowledgment message might be lost as well, sender might resend message already delivered. Akka-persistence provides at-least-once delivery semantics in its core. Journal plugin api defines methods for storing confirmation entries for messaged that have been acknowledged. Acknowledgment and retry mechanism is encapsulated within Channel component that must be used to send events from within processor (AR) to destination actor. Events must be wrapped in Persistent envelop so that receiver can acknowledge the message by simply calling Persistent.acknowledge method.


Thanks to traits composability we can put all the behavior required by reliable AR (sender) to separate component: ReliablePublisher. It extends from abstract EventPublisher that in turns extends from abstract EventHandler. We mix in AggregateRoot with EventHandler making event handling in context of AR configurable as much as possible (useful for testing purposes). Please notice that it is possible to stop redelivery when configured number of retries is exceeded. For that purpose RedeliverFailureListener has been registered on the channel to be notified if redelivery fails. The listener actor throws RedeliveryFailedException exception that results in restart of parent actor (AR) (to make it work supervisorStrategy of listener actor had to be adjusted). Inside preRestart method (that RedeliveryPublisher overrides) we can trigger compensation action and/or mark failing event as deleted (to prevent continuation of redelivery after AR is restarted).

To provide destination for ReliablePublisher its abstract member target must be defined on creation time (AggregateRootActorFactory should take care of this as shown here: ProductPublishingSpec). Finally we should verify if ReliablePublisher works as expected and is really reliable. You can find test here: ReliablePublisherSpec.

Message exchange over queue

Now we will configure infrastructure for events transmission over durable queue. We will use Apache Camel to arrange in-only message exchange  between a component writing to the queue (producer) and projection component reading from the queue (consumer). Thanks to akka-camel both components (producer and consumer) can be represented as ... (surprise! ;-) actors. EventMessageConfirmableProducer is the producer that receives event messages (notice that Events are published inside EventMessage envelope) coming from ReliablePublisher (AR) and forwards them to the configured camel endpoint (for example jms queue or topic) (see transformOutgoingMessage method). Once event message is accepted by the queue (and persisted if the queue is durable) producer acknowledges event reception to the publisher (ReliablePublisher) (see routeResponse method). Please notice that we choosed to unwrap EventMessage from ConfirmablePersistent envelope before putting it into the queue (so that consumer does not have to do unwrapping itself). ConfirmablePersistent still needs to be attached to the EventMessage so we convert it to meta attribute.

Finally we can implement projection as Consumer actor, an actor that consumes messages from camel endpoint. Projection actor simply applies provided projection specification (ProjectionSpec) to received event and finalizes message exchange by either sending acknowledgment or failure. To prevent processing of duplicated events concrete implementation of ProjectionSpec must return sequence number of last processed event for given aggregareId (see currentVersion method):

Before pulling all the pieces together we need to register concrete broker as component of Camel. We will use ActiveMQ. Configuration of the ActiveMQ component is straightforward (see ActiveMQMessaging trait) And of course we need a runner class (EmbeddedActiveMQRunner) that starts the broker as embedded service.

Implementing projection 

Now we can implement concrete projection specification. Let's model ProductCatalog service inside Sales module (context) that maintains list of products with price. Whenever new Product is added to inventory (within Inventory module/context) it should also be added to product catalog in Sales module. Thus we need projection that listens to InventoryQueue and updates product catalog. We will skip implementation details of ProductCatalog as accessing relational db is rather not very interesting topic (we use Slick for that purpose). InventoryProjection just calls insert method of ProductCatalog providing Product data copied from the ProductAdded event and empty price.

An integration test is available here: EventsPublishingClusterSpec . From first node we send two AddProduct commands. Commands are handled by Product AR within inventory context and generated events are published (reliably) to InventoryQueue that is available from all nodes in the cluster. On the second node we register InvetoryProjection and wait for 1 second (so that published events have time to reach the queue) and then send GetProduct query message to ProductFinder to check if expected product has been added to product catalog.

And surprisingly test succeeds ;)!

Last but not least, I added experimental feature to our application:
If command sender wants to be notified once the view has been updated he can request special delivery receipt!

Please see ProductAcknowledgedPublicationSpec:


As homework you can find out how the feature is implemented and let me know your opinion about the idea :)

30 April 2014

Reactive DDD with Akka - lesson 2

Don't miss: ddd-leaven-akka-v2 - runnable sample e-commerce application built from Akka-DDD artifacts described in this series.

Recap of lesson 1

In previous lesson we have managed to implement abstraction of event sourced Aggregate Root (AR) using akka-persistence and use it to develop concrete AR class representing Reservation process. We have shown that AR is message driven component that reacts on command messages. If command (i.e ReserveProduct) is accepted AR produces event message representing this fact (i.e ProductReserved) and once the event is persisted AR sends back acknowledgment message and propagates the event to outside world. If command is rejected, AR simply responds with failure message. All messaging happens asynchronously.

Inside AR we have separated concept of AR's state (AggregateState) and concept of AR's state factory (AggregateRootFactory), both modeled as state machines that compute next state based on the current state and incoming event.

The separation of AR actor and its internal state allows looking at AR creation from two different perspective. First one is domain-centric: AggregateRootFactory encapsulates domain logic only. Second one is technical: AR is an actor that lives somewhere in actor system, thus, for example, it should be provided references to other actors it needs to communicate with. But how actually AR actor is created? Apparently, we have not touched this topic in lesson 1. It's time to get it cleared. The complete code for lesson 2 is available here

Aggregate Root actor creation

In traditional DDD/CQRS architecture, command handler is responsible for creating AR and adding it to repository. When handling subsequent commands, command handler is instantiating the AR by fetching it from repository. With our new architecture we need to rethink these patterns. First of all we don't need another command handling component since AR actor is already a command handler. Secondly, repository of actors does not seem to be a good abstraction. Actors are being created, started, killed, restarted, but not added/loaded to/from repository. In Akka, they are created and supervised by their parents. Let's see how client's interaction with Reservation process can be simplified if we make use of actors supervision pattern.

Don't call me, call my office

Going back to our sales domain and reservation process from lesson 1, each client, before creating a reservation, first had to obtain reference to actor instance (reservation actor) responsible for handling this particular reservation process. We created a method getReservationActor for that purpose:

From the client perspective this pattern is more complicated than it should be. The client just wants to send a command to an actor that knows how to deal with it. He should not be troubled with finding an actor he can talk to. I think it's pretty obvious now that we need a single actor serving all reservation related requests from the clients. The actor should hide the logic of passing messages between clients and reservation actors which means he should be able to manage reservation actors (create them and kill them on demand or in case of failure). In Akka this kind of relationship is called parental supervision. All actors in Akka (excluding internal top-level actors) are created by other actors (their parents) that become their supervisors.
To introduce just discovered pattern to our reservation domain we need to find some friendly and meaningful name for it. The client should not be contacted with "supervising parent of Reservation actors" but rather he should be talking to reservation office.

Office and clerks

Reservation office takes care of handling reservation requests from the clients. The actual job is performed by clerks. Clerks are working on cases they are assigned to (where case id is aggregate id, example: "reservation-1"). AR actor is thus a clerk with assigned case.
I find this analogy quite useful but if you have a better one just drop me a comment.

Implementation of Office actor class should be generic, not tied to particular domain. When request (command) comes in, office should resolve/extract case id from the command and either find a clerk that is currently working on this case or (if there is no such clerk) create new clerk and assign him the case (this translates to creating an child actor with name equal to case id). The problem is that generic office needs to resolve case identifier from command that can be of any type. To make this possible first we need to make Office class parameterized (Office[T <: AggregateRoot]) and then apply type class pattern to inject function Command => AggregateId (AggregateIdResolver) via implicit constructor parameter into concrete Office instance. In other words, creation of office for any class of Aggregate is possible assuming there is implicitly available implementation of AggregateIdResolver for that class.

With the following declaration AggregateIdResolver for Reservation becomes available:

Recipe for actor

Lets talk about actor creation in details. Actors in Akka are created from recipes (Props class). Recipes can be created in different ways, for example by providing class of an actor to be created and its constructor parameters (if any). This way generic office actor can construct recipe of the actor to be created if it knows its class and constructor parameters.

Now we can complete implementation of office actor. After obtaining reference to clerk actor, office actor must just forward the command to the clerk. Please see code of the Office class below:

We need yet helper function that creates office as top-level actor:

Finally the client can send reservation commands to reservation office instead of individual clerk:

Firing of clerks 

The question arises what happens to clerk actors after they finish processing the command(s) and become idle. Well, nothing, they live (in memory) until they are stopped. Thus, we should take care of dismissing clerks that are idle. This can be easily achieved by defining inactivity timeout after which the clerk is notified with ReceiveTimeout message. Being notified, clerk should dismiss (stop) himself or, to avoid dropping commands just enqueued in his inbox, ask office to dismiss him gracefully. This pattern/protocol is called graceful passivation. Because all Aggregate Roots should support this pattern, AggregateRoot class has been enriched with GracefulPassivation trait. Currently Office class does not fully implement graceful passivation of clerks (clerks are stopped, but enqueued messages might be lost) but don't worry, Akka provides much more robust implementation of office pattern: akka-sharding. Akka-sharding does not only supports graceful passivation, but first of all allows office to work in distributed environment.

Global office

Single office is limited by available resources (cpu, memory) so it can handle limited number of requests concurrently. Logical global office consisting of branch offices distributed among different locations will perform much better under high workload. Each branch office should be delegated to work on subset of cases. This is the idea of sharding where branch office is a shard. Akka-sharding takes care of distributing shards among available nodes in the cluster according to given shard allocation strategy. Default strategy allocates new shard to node with least number of allocated shards. The intention is to make workload distributed evenly across all nodes in the cluster. If new nodes are added to the cluster, shards are being rebalanced. Because shards are not tied to single machine, state of the actors needs to be distributed as well. This means akka-persistence must be configured with a distributed journal.

Let's see how global reservation office can be created in Akka using akka-sharding.

Testing global reservation office

Thanks to sbt-multi-jvm plug-in and MultiNodoSpec writing cluster-aware tests in Akka is surprisingly easy. We will not dive into details of cluster configuration but concentrate on sharding. ReservationGlobalOfficeSpec is the test we are going to discuss now. As you can see, at startup, shared journal is configured and cluster is started (consisting of two nodes). Please note that sbt-multi-jvm plug-in will automatically execute the same test on each node concurrently.

The following method is executed on each node to start sharding:

startSharding is a method that actually starts sharding by invoking

Akka-sharding requires IdExtractor and ShardResolver to be defined for AR that is going to be shared. Logic of IdExtractor we have already encapsulated in AggregateIdResolution. Now we will introduce abstract class ShardResolution that extends AggregateIdResolution and defines shardResolver that extracts shard id from the command. This logic is implemented as composition of shardResolutionStrategy and aggregateIdResolver:

Default shard resolution strategy (available in ShardResolution companion object) returns first char from hexadecimal hashcode generated from aggregateId. This means up to 16 shards can be created (0-9 and A-F) (see http://guide.couchdb.org/draft/clustering.html#hashing to learn more about consistent hashing in context of sharding).

Shard resolution strategy in the test however (see code above) is different. It takes just last character from aggregateId. Thus in the first test we expect two reservations "reservation-1" and "reservation-2" being assigned to clerks working on different nodes:

The instance of reservation office is not our previously implemented Office actor but actor created by akka-sharding (helper method globalOffice takes care of that). During the test two requests for creating reservations "reservation-1" and "reservation-2" are sent from node 1. We expect that exactly one request will be processed on both nodes. After both requests are processed, we verify if subsequent requests are correctly processed if sent from a different node than creation requests:

With this test we complete lesson 2.

In next lesson we will try to build views so that our application  can respond to queries.

14 April 2014

Reactive DDD with Akka

Don't miss: ddd-leaven-akka-v2 - runnable sample e-commerce application built from Akka-DDD artifacts described in this series.

When comparing traditional and DDD/CQRS architectures (see my previous article) I said that the goal of DDD model is to decompose complex business domain into manageable pieces taking into account scalability and consistency requirements. What it means is that by bringing concepts like bounded contexts, transaction boundaries and event based communication DDD and CQRS are great enablers for building scalable software. But so far, example services I have presented, were supposed to be run on top of relational database and within global transactions. This is very limiting architecture not suited for building scalable software. Continuous consistency of underlying data that is guaranteed by global transactions should not be perceived as standard requirement of any (including enterprise-class) system. It's an artificial requirement that we all got used to but does not addresses real requirements of the customer. To fully benefit from DDD/CQRS architecture we should change the underlying technology. Today we have a choice. There is a lot of NoSQL databases and there are a few platforms that address scalability as first concern. For JVM, Akka (part of Typesafe platform) is the most robust open-sourced platform for building event-driven applications. Recently akka-persistence module has been released that takes care of handling long-running/persistable processes (this is what Aggregate Roots and Sagas are all about). This is a great feature that allows thinking of Akka as complete platform for building enterprise applications.

Lets then start building event-driven, scalable, resilient and responsive (in short reactive) application using Akka and other goodies from Typesafe platform.
I have already started a project on Github. You are welcome to contribute!

Below is the first lesson I learned from the project and wanted to share it with you. Hopefully more lessons will come (see open issues).

Lesson 1 - Aggregate root is an actor

The source code for lesson 1 is available here: https://github.com/pawelkaczor/ddd-leaven-akka/tree/Lesson1

The goal of lesson 1 is to learn how to build event sourced Aggregate Root (AR) with Akka.
The idea is simple. Aggregate Root should be modeled as stateful actor that accepts Commands and produces Events. Because actor is message driven, we can send Command messages directly to Aggregate Root avoiding "command to method call" transformation.

As already mentioned akka-persistence provides necessary artifacts for building persistable/stateful actors. The key component is akka.persistence.Processor trait. Processor is an actor capable of restoring its state (aka recovering) during reincarnation (start or restart of the actor). The type of underlying storage is append-only pluggable journal.

Command sourcing

Any message of type Persistent that comes to a processor is stored in a journal before it is processed. During recovery, persistent messages are replayed to the processor so that it can restore internal state from these messages.
This pattern (called Command sourcing) is not particularly applicable for Aggregate Roots because replying of command that has not yet been validated is not desired.

Event sourcing

To build AR we need to extend from EventsourcedProcessor that adds event sourcing capability (Eventsourced trait) to Processor trait - only produced events will be stored in the journal. This means we need to explicitly invoke persist(event) method of Eventsourced trait to store produced event in the journal after command message has been validated (by validation I mean ensuring AR's invariants will not be compromised by the command). Since persist method persists events asynchronously (does not block the current thread) it accepts a callback (event handler) as the second argument. Main responsibility of event sourced AR is to provide event handler that will update internal state of AR and handle the event by publishing it and/or sending a response to the client/command sender. Handling of event should be customizable.

AggregateRoot trait

Let's see how to build abstract event sourced AggregateRoot class.

Abstract AggregateRoot keeps state using private variable member of type AggregateState (abstract) and takes care of updating this variable whenever an event is produced/raised (raise method) or replayed (receiveRecovery method). State itself (concrete implementation of AbstractState) should be immutable class implementing method apply that defines state transitions for each event (except initialization). Initialization of the state is performed by AggregateRootFactory - the abstract member of AR that must be overridden in concrete implementation of AR. Initialization is event-driven as well which means that AggregateRootFactory creates initial state from an event. To complete the picture, the raise(event) method calls persist method and, after event is persisted, it either calls default handler or handler provided as the second (optional) argument of the raise method. Default handler publishes an event to event bus (provided by Akka) and sends Acknowledged message back to the sender.

Reservation AR

Please take a look at implementation of concrete Aggregate Root (Reservation). The code should be self explanatory. Command processing consists of validation and raising an event.

ReservationSpec verifies if Reservation AR is in fact stateful component, capable of handling reservation process. The test just simply sends several commands to Reservation AR in valid order and verifies if expected events have been persisted. In the middle of the process Reservation actor is restarted to verify if it preserves the state. And in fact it is since subsequent commands are handled successfully.

Errors handling

By default if any exception of type java.lang.Exception is thrown by the actor the actor is restarted by its supervisor (this is defined in default SupervisionStrategy). Exceptions are not propagated to the command sender automatically as you might expect. We can either catch exception and send them back to the sender from within receiveCommand method or send the exception from within preRestart method that takes exception as reason argument. Overriding preRestart method seems to be a simpler approach. Now we can test if exceptions are returned to the sender: ReservationFailuresSpec

In next lesson...

Currently the client needs to get a reference to particular instance of Aggregate Root before sending the command. It would be much easier for him if he could just send the command to some command gateway. This will be the topic of the next lesson.