4 November 2016

Reactive DDD with Akka - Reactive business processes

Introduction

In the previous post we learned how to implement a sample ordering service using the Akka-DDD framework. The service exposed the Reservation Office responsible for preparing and confirming the reservation of products, that the client added to their shopping cart. We learned that an office is a command handling business entity whose sole responsibility is to validate and process the commands. If the command validation succeeds, an appropriate event is written to the office journal.

The Reservation Office alone is obviously not able to execute the whole Ordering Process engaging activities like payment processing, invoicing and goods delivery. Therefore, to extend the functionality of the system we need to introduce two new subsystems / services: Invoicing and Shipping, that will be hosting Invoicing Office and Shipping Office respectively.

The question arises, how to employ multiple offices to work on a business process and coordinate the workflow, so that the process is executed as defined by the business. To answer this question, we will first learn how to model a business process in a distributed system. Then we will review the implementation of a sample Ordering Process developed under the ddd-leaven-akka-v2 project. Finally we will see how Akka-DDD facilitates the implementation of business processes in a distributed system.

Business processes and SOA

To deliver a business value, an enterprise needs to perform its activities in a coordinated manner. Regardless of whether it is a production line or a decision chain, activities needs to be performed in a specific order accordingly to the rules defined by the business. The business process thus defines precisely what activities, in which order and under which conditions need to be performed, so that the desired business goal gets achieved.

The coordination of the activities implies the information exchange between the collaborators. In the past, business processes were driven by the paper documents flying back and forth between process actors. Nowadays, when more and more activities get performed by computers and machines, the business process execution is realized as a message flow between services. Unfortunately though, for a variety of reasons, the implementation of the Service-Oriented Architecture (SOA), very often ended up with a Big Ball of Mud as well as scalability and reliability issues (just to name a few) in the runtime.
The recent move towards the SOA 2.0 / Event-driven SOA / "SOA done Right" / Microservices (choose your favorite buzzword) enables delivering more light-weight, reliable / fault-tolerant and scalable SOA implementations. When it comes to modeling and executing business processes, the key realization is that since business processes are event-driven, the same events that get written to the office journals could be used to trigger the start or the continuation of business processes. If so, any business process can be implemented as an event-sourced actor (called Process Manager) assuming it gets subscribed to a single stream of events (coming from an arbitrary number of offices) it is interested in. Once an event is received, the Process Manager executes an action, usually by sending a command to an office, and updates its state by writing the event to its journal. In this way, the Process Manager coordinates the work of the offices within the process. What is important is that the logic of a business process is expressed in terms of incoming events, state transitions and outgoing commands. This seems to be a quite powerful domain specific language for describing business processes. Let's take a look at the definition of an sample Ordering Process, that is written using the DSL offered by the Akka-DDD:

Ordering Process - definition

    startWhen {

        case _: ReservationConfirmed => New

    } andThen {

        case New => {

          case ReservationConfirmed(reservationId, customerId, totalAmount) =>
            WaitingForPayment {
              ⟶ (CreateInvoice(processManagerId, reservationId, customerId, totalAmount, now()))
              ⟵ (PaymentExpired(processManagerId, reservationId)) in 3.minutes
            }
        }

        case WaitingForPayment => {

          case PaymentExpired(invoiceId, orderId) =>
              ⟶ (CancelInvoice(invoiceId, orderId))

          case OrderBilled(_, orderId, _, _) =>
            DeliveryInProgress {
              ⟶ (CloseReservation(orderId))
              ⟶ (CreateShipment(UUID(), orderId))
            }

          case OrderBillingFailed(_, orderId) =>
            Failed {
              ⟶ (CancelReservation(orderId))
            }
        }

    }
An order is represented as a process that is triggered by the ReservationConfirmed event published by the Reservation Office. As soon as the order is created, the CreateInvoice command is issued to the Invoicing Office and the status of the order is changed to WaitingForPayment. If the payment succeeds (the OrderBilled event is received from the Invoicing office within 3 minutes) the CreateShipment command is issued to the Shipping Office and the status of the order is changed to DeliveryInProgress. But, if the scheduled timeout message PaymentExpired is received while the order is still not billed, the CancelInvoice command is issued to the Invoicing Office and eventually the process ends with a Failed status.
I hope you agree, that the logic of the Ordering Process is easy to grasp by looking at the code above. We simply declare a set of state transitions with associated triggering events and resulting commands. Please note that ⟵ (PaymentExpired(...)) in 3.minutes gets resolved to the following command: (⟶ ScheduleEvent(PaymentExpired(...), now + 3.minutes)) that will be issued to the specialized Scheduling Office.

The Saga pattern

As the business process participants are distributed and communicate asynchronously (just like the human actors in the real world!) the only way to deal with a failure is to incorporate it into the business process logic. If a failure happens (a command rejected by the office, a command not processed at all (office stopped), an event not received within the configured timeout), the counteraction, called compensation, must be executed. For example, the creation of an invoice is compensated by its cancellation (see the Ordering Process above). Following this rule, we break the long running conversation (the business process) into multiple smaller actions and counteractions that can be coordinated in the distributed environment without the global / distributed transaction.

This pattern for reaching the distributed consensus without a distributed transaction is called the Saga pattern and was first introduced by the Hector Garcia-Molina in the 1987. A Saga pattern can be implemented with or without the central component (coordinator) (see: Orchestration vs. Choreography). The implementation of the Ordering Process follows the Orchestration pattern - the Ordering Process is managed by an actor, that is external to all process participants.

Process Managers and the Coordination Office

The execution of the logic of a particular business process instance is handled by the Process Manager actor. The Process Manager is a stateful / event-sourced actor, just like the regular Aggregate Root actor, except it receives events instead of commands. Just like the Aggregate Root actors, Process Manager actors work in the offices. Both the Command Office (hosting Aggregate Roots) and the Coordination Office (hosting Process Managers) can be started using the OfficeFactory#office method:
    office[Scheduler]           // start Scheduling Command Office

    office[OrderProcessManager] // start Ordering Coordination Office

Let the events flow

The Coordination Office is expected to correlate the received events with the business process instances using the CorrelationID meta-attribute of the event. Therefore to stream the events from the event store to a particular Coordination Office we need to create a Source (see: Reading events from a journal) emitting only these events that:
1) belong to the domain of the particular business process,
2) were assigned the CorrelationID meta-attribute.
One way to address the first requirement is to create a journal (aggregated business process journal), that aggregates the events belonging to the domain of a particular business process. Some Akka Persistence journal providers may support the automatic creation of journals that group events by tags. Unfortunately, this functionality is not yet supported by the Event Store plugin, that is used by the Akka-DDD. Luckily though, using the Event Store projection mechanism, we can create the journal of the Ordering Process by activating the following projection:
fromStreams(['$ce-Reservation', '$ce-Invoice', 'currentDeadlines-global']).
    when({
        'ecommerce.sales.ReservationConfirmed' : function(s,e) {
            linkTo('order', e);
        },
        'ecommerce.invoicing.OrderBilled' : function(s,e) {
            linkTo('order', e);
        },
        'ecommerce.invoicing.OrderBillingFailed' : function(s,e) {
            linkTo('order', e);
        },
        'ecommerce.invoicing.PaymentExpired' : function(s,e) {
            linkTo('order', e);
        }
    });
Now, the events from the order journal need to be assigned the CorrelationID and directed to the responsible Coordination Office. What is important and challenging though, is to ensure that the events get delivered in a reliable manner.

Reliable events propagation

By reliable delivery I mean effectively-once delivery, which takes place when:
  • message is delivered at least once,
  • message is processed (by the destination actor) exactly-once,
  • messages get processed (by the destination actor) in the order they were stored in the source journal.
Effectively-once delivery (a.k.a. The Business Handshake Pattern) can easily be accomplished if the sender keeps track of the "in delivery" messages and the receiver keeps track of the processed messages. The implementation of the pattern becomes straightforward if both the sender and the receiver are event-sourced actors.
For more detailed explanation of how the reliable delivery gets achieved, please refer to the Reliable Delivery Akka-DDD wiki page.
In our scenario, we already have the event-sourced Process Manager on the receiving side (the Coordination Office is just a transparent proxy). The missing component is the event-sourced actor on the sending side. As it is an infrastructure level component, it will automatically be created by the Akka-DDD framework, just after the Coordination Office gets started. The overall behavior of the actor that needs to be created matches the concept of the Receptor — a sensor that reacts to a signal (stimulus), transforms it and propagates (stimulus transduction). The Akka-DDD provides the implementation of the Receptor that supports reliable event propagation, including the back-pressure mechanism.

Receptor

A Receptor gets created by the factory method based on the provided configuration object. The configuration can be built using the simple Receptor DSL.
Receptor actor is a durable subscriber. During the initialization, it is subscribing (by itself) to the event journal of the business entity that was provided as the stimuliSource in the configuration object. After the event has been received and stored in the receptor journal, the transformation function gets called and the result gets sent to the configured receiver. If the receiver address is to be obtained from an event, that gets propagated, then the receiverResolver function should be provided in the configuration.
One might question the fact that the same events get written twice into the event store (first time to the office journal, second time to the receptor journal). I would like to clarify, that in fact this does not happen. The receptor is by default configured to use an in-memory journal and the only messages that get persisted are the snapshots of the receptor state. The snapshots get written to the snapshot store on a regular basis (every n events, where n is configurable) and contain the messages awaiting the delivery receipt.

Coordination Office Receptor

Having learned how to build a receptor, it should be easy to understand the behavior of the Coordination Office receptor by examining its configuration. As we can see, the receptor reacts to the events, coming from the aggregated process journal, adds the CorrelationID meta-attribute to the event message and propagates the event message to the Coordination Office representative actor. The name of the aggregated process journal and the CorrelationID resolver function get retrieved from the ProcessConfig object — an implicit parameter of the office factory method.
To summarize, the Coordination Office receptor gets automatically created by the Akka-DDD framework, based on the configuration of the business process.
So, let's take a look at the OrderProcessConfiguration:
  implicit object OrderProcessConfig extends ProcessConfig[OrderProcessManager]("order", department) {
    def correlationIdResolver = {
      case ReservationConfirmed(reservationId, _, _) => reservationId // orderId
      case OrderBilled(_, orderId, _, _) => orderId
      case OrderBillingFailed(_, orderId) => orderId
      case PaymentExpired(_, orderId) => orderId
    }
  }
The aggregated process journal is given the name: order. The correlationIdResolver function returns the CorrelationID from the orderId attribute of the event, for all events, except the ReservationConfirmed. For the ReservationConfirmed event, the CorrelationID / order ID must be generated, because the Ordering Process is not yet started, while the ReservationConfirmed event is being processed.

Message flow - the complete picture

After the initial event has been received and processed by the Process Manager, the business process instance gets started. The business process will continue, driven by the events. The events will be emitted as soon as the commands, issued by the Process Manager, get processed in the responsible Command Offices.
The following diagram visualizes the flow of the commands and events within the system that occurs when the business process is running.

Business process journal

Before reacting to an event, the Process Manager writes the event to its journal. The journal of a Process Manager is de facto a journal of a business process instance - it keeps the events related to particular business process instance in order they were processed by the Process Manager.

Ordering Process - an alternative implementation (Event Choreography)

Instead of giving the responsibility for the business process execution to the single, external business entity (the Ordering Process is managed by a Process Manager, operating in the Headquarters subsystem), we could let the process emerge by allowing more direct communication between the offices. For example, to send the CreateShipment command to the Shipping Office in reaction to the OrderBilled event coming from the Invoicing Office we could register a simple Receptor in the Shipping subsystem. In the same way, we could implement other interactions required by the Ordering Process and thus make the Headquarters subsystem obsolete.
The overall event flow could be modeled as presented on the following diagram:
We can find this alternative approach to the implementation of the Ordering Process in the previous version of the ddd-leaven-akka-v2 project. The Headquarters subsystem did not exist back then. In the Shipping subsystem, you can find the implementation of the Payment Receptor, that I described above.

Summary

Once we start modeling the interactions that shape our business domain, using the language of the commands and the events, we are on a good way towards creating a model of a system that is simple to understand by the business people and also simple to express in the code.
Although, we could implement the event-centric model on the basis of the monolith architecture, we should not be afraid of the distributed architecture, especially if we want the system to be scalable and fault-tolerant.
As I demonstrated above, the coordination of the activities between the event-sourced business entities can be performed in the distributed environment efficiently and reliably.

1 August 2016

Reactive DDD with Akka - putting the pieces together


Introduction


In this episode we will learn how to assemble a subsystem that encapsulates functionality of a sub-domain of an e-commerce enterprise. The subsystem will be built on top of the Akka platform following a CQRS/DDDD-based approach. We will use the Akka-DDD framework, as it already implements concepts discussed previously, such as Aggregate Root and Office, and also provides other goodies (see the Readme page for the details).

The primary goal is to get familiar with the code structure of a concrete subsystem / service implementation before we take a deep dive into the topic of inter-service communication (business processes) in the next episode.


Subsystem components


As the architecture of the service adheres to the CQRS pattern, the subsystem will consist of the independent write- and read-side applications as presented on the following diagram:

On the write-side, the commands (in the form of HTTP POST requests) gets accepted by the write-front application and forwarded to the backend cluster (write-back applications) for processing. If a command succeeds, then the resulting event gets stored to the Event Store.

On the read-side, the queries (in the form of HTTP GET requests) gets accepted and executed against the View Store by the read-front application. The read-back application hosts the View Update Service, responsible for updating the View Store in reaction to the events streamed from the Event Store.

All applications can be started / restarted independently of each other.


The Sales Service


Let's checkout the code of the Sales Service, that is one of the services of a sample e-commerce system developed under the ddd-leaven-akka-v2 project.

The Sales sub-domain of the e-commerce enterprise covers the preliminary phase of the Order Process during which a customer adds or removes products to his/her shopping-cart and eventually confirms or cancels the order. To fulfill this functionality the Sales Service exposes a Reservation office.


The contract of the Reservation Office


The protocol/contract that the Reservation office publishes to the outside world consists of commands (the Reservation office is ready to handle) and events (the Reservation office is writing to its journal) together with referenced, shared domain objects (such as Product and Money). All these classes are contained in the contracts module which other write- and read-side modules depend on. A client application (that wishes to send a command) or a service consumer (for example a Receptor from another subsystem that subscribes to the Reservation events) must adhere to the contract. As different subsystems can get released / redeployed independently (which is a great advantage over the monolith system), changes in the contract of one service can break its consumers. Therefore in the long run, it is necessary to apply schema evolutions techniques, such as schema versioning or extensible serialization formats.

Akka-DDD supports json format as it is natively supported by the underlying Event Store provider. The currently used json library is Json4s. If you implement the commands and events as simple Scala case classes (no polymorphic lists, no Scala's Enumeration, etc) you don't need to worry about serialization layer at all. If for some reason, you need to deal with such "extensions", don't forget to provide the serialization hints by implementing and registering the JsonSerializationHintsProvider class.

The office should also publish its identifier to be used across the system by the service consumers and client applications.
The office identifier should allow obtaining the identifier of the office journal and the identifiers of the journals of the individual clerks.

Akka-DDD provides the RemoteOfficeId class to be used for that purpose:

The identifier of the Reservation office is shown below:

The messageClass property defines the base class of all message classes that the office is ready to handle (this information helps to auto-configure the command dispatching that is performed by the write-front application).


The Sales Service backend application [write-back]


The executable SalesBackendApp class, located in the write-back module, starts the Sales Actor System based on provided configuration file. The configuration must contain the following entries:

  • entries that enable Akka Cluster capabilities,
  • entry that enables Cluster Client Receptionist extension,

  • entries that indicate the journal and the snapshot-store plugins.

    The Cluster Client Receptionist extensions allows direct communication between actors from the write-front application and the backend cluster.

The startup procedure of the Sales Service backend application is straightforward. First the Sales Actor System joins the cluster using seed nodes. Addresses of the seed nodes are obtained from a file that is specified using the APP_SEEDS_FILE environment variable. If the file is missing, the address is constructed from the app.host (defaults to localhost) and app.port configuration entries. Then the Reservation office gets created:

Assuming the newicom.dddd.cluster package object is available in the scope of the startup procedure (the package object is imported), the actual office creation is delegated to the cluster-aware / sharding-capable OfficeFactory object that is injected automatically as an implicit argument of the office method. The office factory requires a clerk factory and a shard allocation strategy to be implicitly provided for the given AggregateRoot (clerk) class. For the Reservation these objects are defined in the SalesBackendConfiguration trait that the SalesBackendApp mixes-in. The Office object that is eventually created contains the office identifier and the address (in the form of an ActorPath) of the office representative Actor.

The Reservation clerk (Aggregate Root)

Implementation of the Reservation clerk and the corresponding test is rather simple and self-explaining.

Please note that the office factory requires one more parameter to be implicitly provided for the given clerk class: the local office identifier (LocalOfficeId). This is an alternative form of the office identifier, prescribed to be used locally, within the write-back application. The local office identifier must indicate the class of the clerk, so the best place to define it, is the companion object of the clerk class (see Reservation#officeId).


The write-front application


Most of the building blocks of the write-front application is provided by the akka-ddd-write-front module that is part of the Akka-DDD framework.

The Command Dispatcher

The Command Dispatcher is the core component of the write-front application. CommandDispatcher trait takes care of forwarding the incoming commands to the appropriate offices based on the provided remote office identifiers. The forwarding is performed using the Cluster Client (ClusterClientReceptionist extension must be enabled).

The HTTP Command Handler

To make the offices available to the wide range of client applications, the write-front application should accept commands in the form of HTTP POST requests. The HttpCommandHandler is the component that implements all the steps of the command handling logic in the write-front application. First of all it takes care of unmarshalling the command from the incoming request. The request must contain Command-Type attribute in its header, to indicate the class of the command that is passed in the request body. JSON is the expected format in which the command is encoded. Once the command is unmarshalled, the HTTP Command Handler passes it further to the Command Dispatcher. Eventually, once the command is processed on the backend and the response is received from the office (asynchronously), the handler converts it to an appropriate HTTP response that needs to be returned to the client.

The processing logic encapsulated in the HTTP Command Handler, is exposed as the Akka HTTP Route object being the result of calling the handle method:

The route returned by the HTTP Command Handler, is a building block of the complete route that needs to be implemented by the write-front application. Thanks to the Akka HTTP the complete http handler can be easily assembled. Just take a look at the route method of the write-front HTTP Server of the Sales Service:

And that's all for now when it comes to the write side of the system. We didn't cover Receptors and Sagas which are to be presented in the forthcoming episode.

To test if the write side of the Sales Service is operating properly we can start the sales-write-back and sales-write-front applications (see the Wiki for detailed instructions) and send a CreateReservation command. We will use httpie for this:

Hopefully you get the successful result (200 OK).


The Sales View Update Service [read-back]


The view side of the system is not that much interesting as the write side. Again, the logic of the processing is provided by the Akka-DDD. Please see a big picture of the View Update Service. In order to create a View Update Service for the SQL-based View Store provider, we need to extend the SqlViewUpdateService abstract class and provide simple configuration objects. The configuration object defines a sequence of projections for a given office (see: SalesViewUpdateService). The implementation of the projection is self-documenting:

The consume method must return an instance of a parameterized ProjectionAction[E <: Effect] which is a type alias of slick.dbio.DBIOAction[Unit, NoStream, E]

Projections can easily be tested using the in-memory H2 database. See the ReservationProjectionSpec.


The Reservation View Endpoint [read-front]


Finally we need to expose the views to the client applications via HTTP interface. This is a role of the read-front application. The HTTP server is implemented using Akka-HTTP in similar way as for the write-front application. The abstract route method that is defined in the abstract ViewEndpoint must be implemented. The method takes viewStore of type slick.jdbc.JdbcBackend as an input parameter. The view is serialized using json format before it is returned to the client. Please see the implementation of the ReservationViewEndpoint. Note that the view access layer is reused from the read-back application.

After starting the sales-read-back and sales-read-front applications we should be able to fetch a view of the Reservation, that we created previously, using a http client:


"Microservices come in systems"


Since "One microservice is no microservice", in the next episode, we will see how to implement a business logic that can't be fulfilled by Sales Service alone. Stay tuned.