IntroductionIt has been a while since I wrote the last episode in my series: "Reactive DDD with Akka". In that time, in 2015, I managed to release two new projects:
- Akka-DDD - project that contains reusable artifacts for building applications on top of the Akka platform, following CQRS/DDDD-based approach,
- ddd-leaven-akka-v2 - follow-up project of the ddd-leaven-akka that makes usage of Akka-DDD artifacts
Previously we discovered that the Akka with its Akka Persistence module provided a solid platform for building micro-services, using DDD/CQRS design principles and patterns. We learned:
- how to implement event-sourced Aggregate Roots as persistent actors (aka clerks) ,
- how the command dispatching is performed by offices in the standalone and the distributed environment and
- how the events produced by the Aggregate Roots can be transmitted reliably to other actors.
Further, I noticed the lack of support of the query side of the system in the Akka Persistence, and we found out how to overcome this limitation by using an external message broker. We also noticed that if we used the Event Store as an underlying event store, we could reliably feed the query side of the system by executing the Event Store Projections (thus avoiding the necessity of introducing a message broker into the system). In my view this idea was so interesting that I decided to give it a try and that's why I started the Akka-DDD project.
In this episode we will learn how to create two kinds of journals using the Event Store Projections mechanism:
- office journals - containing events emitted by a concrete office
- aggregated business process journals - containing events related to a concrete business process
Recently a new version of the Akka Persistence has been released that contains Query Support on its feature list. So consequently, do we still need to use the Event Store API directly? We will answer this question at the end.
However, now, we will begin with introduction to the Event Store (as a component that we want to integrate with the Akka-DDD) by looking at it from the two opposite sides:
- 'the write side' - the Event Store as a journal provider
- 'the read side' - the Event Store as the event bus
The Event Store as a journal provider
The clerk's journalThe EventStore Akka Persistence is a storage plug-in (journal provider) implemented for the Event Store. Under the hood the plug-in uses Event Store JVM Client. When a persistent actor requests the persisting of an event message, the journal tells the Event Store to write the given event message to a stream with a
streamIdequal to the actor's persistenceId. If a stream with the given ID does not exist, it is automatically created by the Event Store. The Akka-DDD defines
persistenceIdof a persistent actor as a concatenation of
clerkId(separated by a dash), where
officeIdmust be implicitly provided for each Aggregate Root class (via
OfficeIdtype class) and
clerkIdis defined as actor's ID . The
[officeId]-[clerkId]is thus the ID of a stream in the Event Store being a journal of a clerk (identified by
clerkId) within an office (identified by
 The ID of a persistent and sharded actor is extracted from the first command message it receives.
The journal entry formatThe Event Store requires the following data to create a journal entry:
- EventId - used internally by the Event Store for idempotency
- EventType - an arbitrary string - commonly used for events selection (see Projections)
- Data - actual data in the serialized form (according to the ContentType)
- ContentType - for instance json, binary, etc
- Metadata (optional) - additional data associated with the entry in a serialized form (according to the MetadataContentType)
- MetadataContentType (optional) - for instance json, binary, etc.
Transforming an event to a journal entryBefore a domain event is written to an actor's journal, it is first wrapped by the Akka-DDD in an
EventMessageenvelope (see: AggregateRoot#raise) and then the
EventMessageis wrapped by the Akka in a
PersistentReprenvelope. Eventually the journal plug-in is executed to store the
PersistentReprin the actor's journal.
| Event (Domain) → EventMessage (Akka-DDD) → PersistentRepr (Akka) → Journal entry (EventStore plug-in)|
EventMessageenvelope allows adding an arbitrary number of meta attributes. The Akka-DDD takes care of handling the following meta attributes:
- id - the application level message ID (independent from the internal ID used by the Event Store)
- timestamp - the record of the time the event was created at
- causationId - the application level ID of some other message, that caused this message (ie.
commandIdfor the events raised by the clerks)
- correlationId (optional) - the application level ID of a business process, that the event message is associated with
- _deliveryId (optional) - it is used between actors that communicate using At-Least-Once delivery semantics
PersistentReprto the journal entry format is performed by a specialized serializer that is automatically registered by the Akka-DDD. The serializer uses json format taking advantage of the fact that the Event Store natively supports json. The
EventTypeattribute is set to a name of an event class. The serializer also takes care of the serializing of the metadata defined in the
EventMessage. Here is an example of a journal entry for a
ReservationCreatedevent, read from
Reservation-57d868stream (office id: Reservation, clerk id: 57d868):
As you can see, although the
Dataelement contains a json representation of an object instance of a
PersistentImplclass (a class implementing a
EventTypeelement contains a value that refers to the actual event. The event itself is stored under the
payloadattribute of the
Please notice also the
sequenceNrattribute of the
PersistentImpl. It is generated by the Akka and represents a position of the entry in the actor's journal. Since the
sequenceNris available only after an event message has been stored, the Akka-DDD distinguishes between the
EventMessage- the event message to be stored in the journal and the
OfficeEventMessage- the event message fetched from the journal.
The Event Store as the event busIn general, the event bus is a layer that allows the 'publish - subscribe' style communication between components without requiring the components to explicitly register with one another. So far we have discussed how events get published (written to the journals) from 'the write side' of the system. Now it is a time to describe the event subscribers - the actors that want to be notified about the published events. There are two types of the event subscribers available in the Akka-DDD:
- View Updaters - responsible for the updating of 'the read side' of the system. They are interested in the events from the particular office journal.
- Receptors - responsible for the event-driven interaction between the subsystems (event choreography), including long-running processes (sagas). They are interested in the events from a particular office journal or a particular aggregated business process journal.
Knowing the ID of a stream, we can use the Event Store API to register a subscriber, interested in getting events from that stream. A subscription can be defined as a 'live-only' (only new events get pushed to the subscriber) or a 'catch-up' one. A 'catch-up' subscription works in a very similar way to a 'live-only' subscription, with one notable difference: subscriber specifies the position, from which events will get pushed. A 'catch-up' subscription thus allows creating the durable subscribers. Such subscribers can resume processing the events as long as they are able to record position of the last processed event. The subscribers resume the processing after they were stopped or terminated and then restarted e.g. as the result of system crash. In the next episode, we will learn how to implement the View Updaters and the Receptors using the 'catch-up' subscriptions.
But now let's see how we can create new streams using the Event Store Projections.
- The identifier(s) of input event stream(s)
- The event(s) selection(s)
- The function(s) that accepts an event and a state as a parameter. The function can call the
linkTo(streamId, event)function to write the input event into an arbitrary stream or it can call the
emit(streamId, eventType, event)function to emit a new event into an arbitrary stream.
Let's see an example. The projection below will watch the built-in
$stats-[ip:host]stream containing low level system statistics for events of the type:
$statsCollectedand will emit a new event (of the type:
heavyCpuFound) to the new
heavycpustream whenever a value of the
sys-cpustatistic, read from the caught event, would exceed 40:
Creating an office journalWe learned that the stream with ID:
Reservation-57d868represents a journal of a clerk
57d868working in the
Reservationoffice. Now we want to create a journal that contains the events emitted by all clerks working in the
Reservationoffice. To accomplish this, we need to use the
$by_categorysystem (built-in) projection. It turns out that the Event Store is able to extract a category of a stream from its ID (treating dash (
-) as a category separator). The
$by_categoryprojection, once enabled (all system projections are disabled by default), will detect the
Reservationcategory and will create a
$ce-Reservationjournal for the
Reservationoffice automatically. Similarly it will create appropriate office journals for all the other offices already existing in the system or created in the future (all the system projections are running in continuous mode so we don't need to restart them in the future anymore).
Creating an aggregated business process journalNow, when we know how to create the office journals, we can use them as input streams for the aggregated business process journals. For example let's create an
invoicingjournal that will contain all the events related to the invoicing business process:
This time we have defined journals of the two offices (
Invoice) as input streams. Then for each type of an event, relevant to the invoicing business process, we have defined a function that simply "inserts" the original event into an
 In fact, when using
linkTofunction, the event inserted into the output stream is not the original event (or its copy), but a special link event containing only a pointer to the original event.
Please notice that the
invoicingstream does not represent a journal of a concrete instance of the invoicing business process (an invoicing process for a concrete customer/order). Once we learn about the Coordination Office (in the upcoming episode in the series) we will also learn how a special Receptor takes care of forwarding the events, read from the
invoicingstream to the
Invoicing Coordination Officethat in turn forwards/routes them to the concrete clerks responsible for the management of the single business process instances. The clerks then decide which events to store in their own journals - the journals representing the single process instances.
Reading events from a journalIt is a time to learn how to implement a durable event subscriber using the Akka-DDD framework. The trait we will need to use is EventSourceProvider located in the
eventstore-akka-persistencemodule. The trait exposes the
eventSource(esConnection, observable, fromPositionExclusive): Source[EventMessageRecord, Unit]method. As the method's signature suggests, it accepts some observable object (next to the Event Store connection object and the start position) and returns an object that is a source of the EventMessageEntry objects. The Source class is Akka's representation of Publisher as defined by the Reactive Streams standard.
eventSourcemethod takes an observable BusinessEntity and obtains
streamIdfrom it by calling the StreamIdResolver. The
StreamIdResolverknows how to resolve a
streamIdregardless whether the given entity is a clerk, an office or a coordination office. The method then uses the obtained
streamIdto create a
Publisherby calling the
streamPublisher(streamId, position, ...)method provided by the EventStore JVM Client Reactive Streams API. Finally the method converts the
Publisherobject to a
Sourceobject that is instructed to emit the event messages (discussed previously) wrapped into an
The Akka-DDD makes use of the
EventSourceProvidertrait to implement two types of the durable subscribers: the View Update Service and the Receptor. We will not dive into the implementation details of these services in this article, but as you can imagine, the stream processing is the preferred pattern used there.
The Akka Persistence QueryAs stated in the docs, since version 2.4, the Akka Persistence provides a universal asynchronous stream based query interface that various journal plug-ins can implement in order to expose their query capabilities. The interface exposes the
ReadJournaltrait family that provides two groups of methods for reading events from the journal:
The methods that return a source, that is emitting the historical events:
As you can see, the interface supports not only queries for events from a single journal but also queries for "tagged" events from an arbitrary number of journals.
Some journal plug-ins may support the
EventsByTagqueries out of the box by requiring events to be wrapped in a
akka.persistence.journal.Taggedbefore they get written to the journal (Such a wrapping could be implemented using Event Adapters). Other plug-ins may treat tags as identifiers of the arbitrary event journals such as office journals or business process journals. These journals could be managed externally (for example using the Projections in case of the Event Store (as we have seen above)).
Going back to the Akka-DDD, would it be possible to use the Akka Persistence Query instead of the EventStore JVM Client and thus to gain more interoperability? Well, currently this is not possible, because the Eventstore Akka Persistence plug-in supports only the queries for the events from a single journal (the
EventsByTagqueries are not supported). So the following code will not work unfortunately: