Reactive DDD with Akka
2014-04-15 · cqrs · ddd · akka
When comparing traditional and DDD/CQRS architectures (see my previous article) I said that the goal of DDD model is to decompose complex business domain into manageable pieces taking into account scalability and consistency requirements. What it means is that by bringing concepts like bounded contexts, transaction boundaries and event based communication DDD and CQRS are great enablers for building scalable software. But so far, example services I have presented, were supposed to be run on top of relational database and within global transactions. This is very limiting architecture not suited for building scalable software. Continuous consistency of underlying data that is guaranteed by global transactions should not be perceived as standard requirement of any (including enterprise-class) system. It’s an artificial requirement that we all got used to but does not addresses real requirements of the customer. To fully benefit from DDD/CQRS architecture we should change the underlying technology. Today we have a choice. There is a lot of NoSQL databases and there are a few platforms that address scalability as first concern. For JVM, Akka (part of Typesafe platform) is the most robust open-sourced platform for building event-driven applications. Recently akka-persistence module has been released that takes care of handling long-running/persistable processes (this is what Aggregate Roots and Sagas are all about). This is a great feature that allows thinking of Akka as complete platform for building enterprise applications.
Lets then start building event-driven, scalable, resilient and responsive (in short reactive) application using Akka and other goodies from Typesafe platform. I have already started a project on Github. You are welcome to contribute!
Below is the first lesson I learned from the project and wanted to share it with you. Hopefully more lessons will come (see open tickets on Github).
Lesson 1 - Aggregate root is an actor
The source code for lesson 1 is available here
The goal of lesson 1 is to learn how to build event sourced Aggregate Root (AR) on Akka.
The idea is simple. Aggregate Root should be modeled as stateful Actor
that accepts Commands and produces Events. Because actor is message driven, we can send Command
messages directly to Aggregate Root avoiding “command to method call” transformation.
As already mentioned akka-persistence provides necessary artifacts for building persistable/stateful actors. The key component is akka.persistence.Processor
trait. Processor is an actor capable of restoring its state (aka recovering) during reincarnation (start or restart of the actor). The type of underlying storage is append-only pluggable journal.
Command sourcing
Any message of type Persistent
that comes to a processor is stored in a journal before it is processed. During recovery, persistent messages are replayed to the processor so that it can restore internal state from these messages.
This pattern (called Command sourcing) is not particularly applicable for Aggregate Roots because replying of command that has not yet been validated is not desired.
Event sourcing
To build AR we need to extend from EventsourcedProcessor
that adds event sourcing capability (Eventsourced
trait) to Processor
trait - only produced events will be stored in the journal. This means we need to explicitly invoke persist(event)
method of Eventsourced
trait to store produced event in the journal after command message has been validated (by validation I mean ensuring AR’s invariants will not be compromised by the command). Since persist
method persists events asynchronously (does not block the current thread) it accepts a callback (event handler) as the second argument. Main responsibility of event sourced AR is to provide event handler that will update internal state of AR and handle the event by publishing it and/or sending a response to the client/command sender. Handling of event should be customizable.
AggregateRoot trait
Let’s see how to build abstract event sourced AggregateRoot class.
Abstract AggregateRoot
keeps state using private variable member of type AggregateState
(abstract) and takes care of updating this variable whenever an event is produced/raised (raise
method) or replayed (receiveRecovery
method). State itself (concrete implementation of AbstractState
) should be immutable class implementing method apply
that defines state transitions for each event (except initialization). Initialization of the state is performed by AggregateRootFactory
- the abstract member of AR that must be overridden in concrete implementation of AR. Initialization is event-driven as well which means that AggregateRootFactory
creates initial state from an event. To complete the picture, the raise(event) method calls persist
method and, after event is persisted, it either calls default handler or handler provided as the second (optional) argument of the raise
method. Default handler publishes an event to event bus (provided by Akka) and sends Acknowledged
message back to the sender.
Reservation AR
Please take a look at implementation of concrete Aggregate Root (Reservation). The code should be self explanatory. Command processing consists of validation and raising an event.
ReservationSpec verifies if Reservation AR is in fact stateful component, capable of handling reservation process. The test just simply sends several commands to Reservation AR in valid order and verifies if expected events have been persisted. In the middle of the process Reservation actor is restarted to verify if it preserves the state. And in fact it is since subsequent commands are handled successfully.
Errors handling
By default if any exception of type java.lang.Exception is thrown by the actor the actor is restarted by its supervisor (this is defined in default SupervisionStrategy
). Exceptions are not propagated to the command sender automatically as you might expect. We can either catch exception and send them back to the sender from within receiveCommand
method or send the exception from within preRestart
method that takes exception as reason
argument. Overriding preRestart
method seems to be a simpler approach. Now we can test if exceptions are returned to the sender: ReservationFailuresSpec.
In next lesson…
Currently the client needs to get a reference to particular instance of AggregateRoot before sending the command. It would be much easier for him if he could just send the command to some command gateway. This will be the topic of the next lesson.
http://pkaczor.blogspot.com/2014/04/reactive-ddd-with-akka.html
Autor: Paweł Kaczor
Software Developer, passionate about functional programming, the Scala programming language and the DDD/CQRS/ES architecture.