14 kwietnia 2014

Reactive DDD with Akka

When comparing traditional and DDD/CQRS architectures (see my previous article) I said that the goal of DDD model is to decompose complex business domain into manageable pieces taking into account scalability and consistency requirements. What it means is that by bringing concepts like bounded contexts, transaction boundaries and event based communication DDD and CQRS are great enablers for building scalable software. But so far, example services I have presented, were supposed to be run on top of relational database and within global transactions. This is very limiting architecture not suited for building scalable software. Continuous consistency of underlying data that is guaranteed by global transactions should not be perceived as standard requirement of any (including enterprise-class) system. It's an artificial requirement that we all got used to but does not addresses real requirements of the customer. To fully benefit from DDD/CQRS architecture we should change the underlying technology. Today we have a choice. There is a lot of NoSQL databases and there are a few platforms that address scalability as first concern. For JVM, Akka (part of Typesafe platform) is the most robust open-sourced platform for building event-driven applications. Recently akka-persistence module has been released that takes care of handling long-running/persistable processes (this is what Aggregate Roots and Sagas are all about). This is a great feature that allows thinking of Akka as complete platform for building enterprise applications.

Lets then start building event-driven, scalable, resilient and responsive (in short reactive) application using Akka and other goodies from Typesafe platform.
I have already started a project on Github. You are welcome to contribute!

Below is the first lesson I learned from the project and wanted to share it with you. Hopefully more lessons will come (see open issues).

Lesson 1 - Aggregate root is an actor

The source code for lesson 1 is available here: https://github.com/pawelkaczor/ddd-leaven-akka/tree/Lesson1

The goal of lesson 1 is to learn how to build event sourced Aggregate Root (AR) with Akka.
The idea is simple. Aggregate Root should be modeled as stateful actor that accepts Commands and produces Events. Because actor is message driven, we can send Command messages directly to Aggregate Root avoiding "command to method call" transformation.

As already mentioned akka-persistence provides necessary artifacts for building persistable/stateful actors. The key component is akka.persistence.Processor trait. Processor is an actor capable of restoring its state (aka recovering) during reincarnation (start or restart of the actor). The type of underlying storage is append-only pluggable journal.

Command sourcing

Any message of type Persistent that comes to a processor is stored in a journal before it is processed. During recovery, persistent messages are replayed to the processor so that it can restore internal state from these messages.
This pattern (called Command sourcing) is not particularly applicable for Aggregate Roots because replying of command that has not yet been validated is not desired.

Event sourcing

To build AR we need to extend from EventsourcedProcessor that adds event sourcing capability (Eventsourced trait) to Processor trait - only produced events will be stored in the journal. This means we need to explicitly invoke persist(event) method of Eventsourced trait to store produced event in the journal after command message has been validated (by validation I mean ensuring AR's invariants will not be compromised by the command). Since persist method persists events asynchronously (does not block the current thread) it accepts a callback (event handler) as the second argument. Main responsibility of event sourced AR is to provide event handler that will update internal state of AR and handle the event by publishing it and/or sending a response to the client/command sender. Handling of event should be customizable.

AggregateRoot trait

Let's see how to build abstract event sourced AggregateRoot class.

Abstract AggregateRoot keeps state using private variable member of type AggregateState (abstract) and takes care of updating this variable whenever an event is produced/raised (raise method) or replayed (receiveRecovery method). State itself (concrete implementation of AbstractState) should be immutable class implementing method apply that defines state transitions for each event (except initialization). Initialization of the state is performed by AggregateRootFactory - the abstract member of AR that must be overridden in concrete implementation of AR. Initialization is event-driven as well which means that AggregateRootFactory creates initial state from an event. To complete the picture, the raise(event) method calls persist method and, after event is persisted, it either calls default handler or handler provided as the second (optional) argument of the raise method. Default handler publishes an event to event bus (provided by Akka) and sends Acknowledged message back to the sender.

Reservation AR

Please take a look at implementation of concrete Aggregate Root (Reservation). The code should be self explanatory. Command processing consists of validation and raising an event.

ReservationSpec verifies if Reservation AR is in fact stateful component, capable of handling reservation process. The test just simply sends several commands to Reservation AR in valid order and verifies if expected events have been persisted. In the middle of the process Reservation actor is restarted to verify if it preserves the state. And in fact it is since subsequent commands are handled successfully.

Errors handling

By default if any exception of type java.lang.Exception is thrown by the actor the actor is restarted by its supervisor (this is defined in default SupervisionStrategy). Exceptions are not propagated to the command sender automatically as you might expect. We can either catch exception and send them back to the sender from within receiveCommand method or send the exception from within preRestart method that takes exception as reason argument. Overriding preRestart method seems to be a simpler approach. Now we can test if exceptions are returned to the sender: ReservationFailuresSpec

In next lesson...

Currently the client needs to get a reference to particular instance of Aggregate Root before sending the command. It would be much easier for him if he could just send the command to some command gateway. This will be the topic of the next lesson.

7 komentarzy:

  1. Nice one, can't wait to see more of these!

    // Maybe include small snippets of mentioned code in the post, along with links to the file on github?

    OdpowiedzUsuń
  2. What I still dont get about this DDD concept is how you deal with 1 billion records.

    OdpowiedzUsuń
    Odpowiedzi
    1. Is your worry about storing events (thus way more records than with "1 record = 1 entity")?
      If so then the answer is usually snapshots. Akka also has built in snapshotting, so you wouldn't replay all 1b events, but only these from the latests snapshot (and the snapshot acts like "state of this thing, until this point in time"). When you have a snapshot, you could migrate data from "before the snapshot" to another datastore aimed for historical analysis etc.

      I hope this helped a bit :-)
      Feel free to ask here or on the akka-user mailing list about design concents with DDD / Akka.

      Usuń
  3. Hi Pawel,

    Great articles, nice to read.

    What happens when you need to create an aggregate with many properties, e.g. twenty or thirty properties?

    If I follow your sample code, I end up with aggregate states, and "creation" commands/events that have huge constructors.

    Also, how would one handle the case where a lot of such properties were of no consequence to the business logic, but simply needed to be stored so that they can be used later?

    Thanks,
    Steve

    OdpowiedzUsuń
    Odpowiedzi
    1. Hi Freecloud,
      - first of all you need to decompose your system into subdomains (bounded contexts) to be able to identify and design Aggregate Roots. Not every subdomain needs to be modeled using DDD technics. DDD should be used for not trivial (core) domains. Usually core domain has limited number of properties.

      - AR should contain only properties that are necessary for constraints validation (ONLY constraints inside boundary of AR) and behavior execution (i.e. no derived properties)

      - changes to AR state can be manifested by change of type of AR state (DraftInvoice -> SentInvoice -> PaidInvoice , all subclasses of Invoice, so no need to keep property indicating current state of invoice)

      - technical properties (AR actor dependencies (i.e. domain service actors)) are separated from business properties (properties of AR state)

      - when domain is decomposed into multiply ARs, ARs become thin (as dictated by single responsibility principle)

      - but in asynchronous environment this means you must embrace eventual consistency

      You can visit https://groups.google.com/forum/#!forum/dddcqrs to find more information on this subject.

      Usuń
    2. Hi Pawel,

      Thanks for your detailed reply. In my case, such an example would be a financial contract. Contracts can contain quite a lot of somewhat peripheral information, and I agree that in terms of a contract aggregate root, only those properties that are required for validation and processing of events should be specified.

      But what to do with the peripheral information - maintain it in a key-value or JSON payload? Other areas of the system may need to access this information (preferably without sending a message to a contract aggregate actor simply to get the value of a couple of properties). Is this a case for maintaining the data within the contract AR through STM or in-memory datagrid?

      Regards,
      Steve

      Usuń