Introduction
It has been a while since I wrote the last episode in my series: "Reactive DDD with Akka". In that time, in 2015, I managed to release two new projects:- Akka-DDD - project that contains reusable artifacts for building applications on top of the Akka platform, following CQRS/DDDD-based approach,
- ddd-leaven-akka-v2 - follow-up project of the ddd-leaven-akka that makes usage of Akka-DDD artifacts
Previously we discovered that the Akka with its Akka Persistence module provided a solid platform for building micro-services, using DDD/CQRS design principles and patterns. We learned:
- how to implement event-sourced Aggregate Roots as persistent actors (aka clerks) [1],
- how the command dispatching is performed by offices in the standalone and the distributed environment and
- how the events produced by the Aggregate Roots can be transmitted reliably to other actors.
Further, I noticed the lack of support of the query side of the system in the Akka Persistence, and we found out how to overcome this limitation by using an external message broker. We also noticed that if we used the Event Store as an underlying event store, we could reliably feed the query side of the system by executing the Event Store Projections (thus avoiding the necessity of introducing a message broker into the system). In my view this idea was so interesting that I decided to give it a try and that's why I started the Akka-DDD project.
In this episode we will learn how to create two kinds of journals using the Event Store Projections mechanism:
- office journals - containing events emitted by a concrete office
- aggregated business process journals - containing events related to a concrete business process
Recently a new version of the Akka Persistence has been released that contains Query Support on its feature list. So consequently, do we still need to use the Event Store API directly? We will answer this question at the end.
However, now, we will begin with introduction to the Event Store (as a component that we want to integrate with the Akka-DDD) by looking at it from the two opposite sides:
- 'the write side' - the Event Store as a journal provider
- 'the read side' - the Event Store as the event bus
The Event Store as a journal provider
The clerk's journal
The EventStore Akka Persistence is a storage plug-in (journal provider) implemented for the Event Store. Under the hood the plug-in uses Event Store JVM Client. When a persistent actor requests the persisting of an event message, the journal tells the Event Store to write the given event message to a stream with astreamId
equal to the actor's persistenceId. If a stream with the given ID does not exist, it is automatically created by the Event Store. The Akka-DDD defines persistenceId
of a persistent actor as a concatenation of officeId
and clerkId
(separated by a dash), where officeId
must be implicitly provided for each Aggregate Root class (via OfficeId
type class) and clerkId
is defined as actor's ID [2]. The [officeId]-[clerkId]
is thus the ID of a stream in the Event Store being a journal of a clerk (identified by clerkId
) within an office (identified by officeId
).[2] The ID of a persistent and sharded actor is extracted from the first command message it receives.
The journal entry format
The Event Store requires the following data to create a journal entry:- EventId - used internally by the Event Store for idempotency
- EventType - an arbitrary string - commonly used for events selection (see Projections)
- Data - actual data in the serialized form (according to the ContentType)
- ContentType - for instance json, binary, etc
- Metadata (optional) - additional data associated with the entry in a serialized form (according to the MetadataContentType)
- MetadataContentType (optional) - for instance json, binary, etc.
Transforming an event to a journal entry
Before a domain event is written to an actor's journal, it is first wrapped by the Akka-DDD in anEventMessage
envelope (see: AggregateRoot#raise) and then the EventMessage
is wrapped by the Akka in a PersistentRepr
envelope. Eventually the journal plug-in is executed to store the PersistentRepr
in the actor's journal. Event (Domain) → EventMessage (Akka-DDD) → PersistentRepr (Akka) → Journal entry (EventStore plug-in) |
The
EventMessage
envelope allows adding an arbitrary number of meta attributes. The Akka-DDD takes care of handling the following meta attributes: - id - the application level message ID (independent from the internal ID used by the Event Store)
- timestamp - the record of the time the event was created at
- causationId - the application level ID of some other message, that caused this message (ie.
commandId
for the events raised by the clerks) - correlationId (optional) - the application level ID of a business process, that the event message is associated with
- _deliveryId (optional) - it is used between actors that communicate using At-Least-Once delivery semantics
PersistentRepr
to the journal entry format is performed by a specialized serializer that is automatically registered by the Akka-DDD. The serializer uses json format taking advantage of the fact that the Event Store natively supports json. The EventType
attribute is set to a name of an event class. The serializer also takes care of the serializing of the metadata defined in the EventMessage
. Here is an example of a journal entry for a ReservationCreated
event, read from Reservation-57d868
stream (office id: Reservation, clerk id: 57d868): As you can see, although the
Data
element contains a json representation of an object instance of a PersistentImpl
class (a class implementing a PersistentRepr
trait), the EventType
element contains a value that refers to the actual event. The event itself is stored under the payload
attribute of the PersistentImpl
object.Please notice also the
sequenceNr
attribute of the PersistentImpl
. It is generated by the Akka and represents a position of the entry in the actor's journal. Since the sequenceNr
is available only after an event message has been stored, the Akka-DDD distinguishes between the EventMessage
- the event message to be stored in the journal and the OfficeEventMessage
- the event message fetched from the journal. The Event Store as the event bus
In general, the event bus is a layer that allows the 'publish - subscribe' style communication between components without requiring the components to explicitly register with one another. So far we have discussed how events get published (written to the journals) from 'the write side' of the system. Now it is a time to describe the event subscribers - the actors that want to be notified about the published events. There are two types of the event subscribers available in the Akka-DDD:- View Updaters - responsible for the updating of 'the read side' of the system. They are interested in the events from the particular office journal.
- Receptors - responsible for the event-driven interaction between the subsystems (event choreography), including long-running processes (sagas). They are interested in the events from a particular office journal or a particular aggregated business process journal.
Knowing the ID of a stream, we can use the Event Store API to register a subscriber, interested in getting events from that stream. A subscription can be defined as a 'live-only' (only new events get pushed to the subscriber) or a 'catch-up' one. A 'catch-up' subscription works in a very similar way to a 'live-only' subscription, with one notable difference: subscriber specifies the position, from which events will get pushed. A 'catch-up' subscription thus allows creating the durable subscribers. Such subscribers can resume processing the events as long as they are able to record position of the last processed event. The subscribers resume the processing after they were stopped or terminated and then restarted e.g. as the result of system crash. In the next episode, we will learn how to implement the View Updaters and the Receptors using the 'catch-up' subscriptions.
But now let's see how we can create new streams using the Event Store Projections.
Creating streams using the Event Store Projections
The Event Store is able to execute the built-in or a user defined projection - a chunk of javascript code containing the following elements:- The identifier(s) of input event stream(s)
- The event(s) selection(s)
- The function(s) that accepts an event and a state as a parameter. The function can call the
linkTo(streamId, event)
function to write the input event into an arbitrary stream or it can call theemit(streamId, eventType, event)
function to emit a new event into an arbitrary stream.
Let's see an example. The projection below will watch the built-in
$stats-[ip:host]
stream containing low level system statistics for events of the type: $statsCollected
and will emit a new event (of the type: heavyCpuFound
) to the new heavycpu
stream whenever a value of the sys-cpu
statistic, read from the caught event, would exceed 40:Creating an office journal
We learned that the stream with ID:Reservation-57d868
represents a journal of a clerk 57d868
working in the Reservation
office. Now we want to create a journal that contains the events emitted by all clerks working in the Reservation
office. To accomplish this, we need to use the $by_category
system (built-in) projection. It turns out that the Event Store is able to extract a category of a stream from its ID (treating dash (-
) as a category separator). The $by_category
projection, once enabled (all system projections are disabled by default), will detect the Reservation
category and will create a $ce-Reservation
journal for the Reservation
office automatically. Similarly it will create appropriate office journals for all the other offices already existing in the system or created in the future (all the system projections are running in continuous mode so we don't need to restart them in the future anymore).Creating an aggregated business process journal
Now, when we know how to create the office journals, we can use them as input streams for the aggregated business process journals. For example let's create aninvoicing
journal that will contain all the events related to the invoicing business process:This time we have defined journals of the two offices (
Reservation
and Invoice
) as input streams. Then for each type of an event, relevant to the invoicing business process, we have defined a function that simply "inserts" the original event into an invoicing
stream [3]. [3] In fact, when using
linkTo
function, the event inserted into the output stream is not the original event (or its copy), but a special link event containing only a pointer to the original event. Please notice that the
invoicing
stream does not represent a journal of a concrete instance of the invoicing business process (an invoicing process for a concrete customer/order). Once we learn about the Coordination Office (in the upcoming episode in the series) we will also learn how a special Receptor takes care of forwarding the events, read from the invoicing
stream to the Invoicing Coordination Office
that in turn forwards/routes them to the concrete clerks responsible for the management of the single business process instances. The clerks then decide which events to store in their own journals - the journals representing the single process instances. Reading events from a journal
It is a time to learn how to implement a durable event subscriber using the Akka-DDD framework. The trait we will need to use is EventSourceProvider located in theeventstore-akka-persistence
module. The trait exposes the eventSource(esConnection, observable, fromPositionExclusive): Source[EventMessageRecord, Unit]
method. As the method's signature suggests, it accepts some observable object (next to the Event Store connection object and the start position) and returns an object that is a source of the EventMessageEntry objects. The Source class is Akka's representation of Publisher as defined by the Reactive Streams standard. The
eventSource
method takes an observable BusinessEntity and obtains streamId
from it by calling the StreamIdResolver. The StreamIdResolver
knows how to resolve a streamId
regardless whether the given entity is a clerk, an office or a coordination office. The method then uses the obtained streamId
to create a Publisher
by calling the streamPublisher(streamId, position, ...)
method provided by the EventStore JVM Client Reactive Streams API. Finally the method converts the Publisher
object to a Source
object that is instructed to emit the event messages (discussed previously) wrapped into an EventMessageEntry
envelope. The Akka-DDD makes use of the
EventSourceProvider
trait to implement two types of the durable subscribers: the View Update Service and the Receptor. We will not dive into the implementation details of these services in this article, but as you can imagine, the stream processing is the preferred pattern used there.The Akka Persistence Query
As stated in the docs, since version 2.4, the Akka Persistence provides a universal asynchronous stream based query interface that various journal plug-ins can implement in order to expose their query capabilities. The interface exposes theReadJournal
trait family that provides two groups of methods for reading events from the journal:The methods that return a source, that is emitting the historical events:
- currentEventsByPersistenceId(id)
- currentEventsByTag(tag)
- eventsByPersistenceId(id)
- eventsByTag(tag)
As you can see, the interface supports not only queries for events from a single journal but also queries for "tagged" events from an arbitrary number of journals.
Some journal plug-ins may support the
EventsByTag
queries out of the box by requiring events to be wrapped in a akka.persistence.journal.Tagged
before they get written to the journal (Such a wrapping could be implemented using Event Adapters). Other plug-ins may treat tags as identifiers of the arbitrary event journals such as office journals or business process journals. These journals could be managed externally (for example using the Projections in case of the Event Store (as we have seen above)).Going back to the Akka-DDD, would it be possible to use the Akka Persistence Query instead of the EventStore JVM Client and thus to gain more interoperability? Well, currently this is not possible, because the Eventstore Akka Persistence plug-in supports only the queries for the events from a single journal (the
EventsByTag
queries are not supported). So the following code will not work unfortunately: