Sunday, April 19, 2020

Reactive Architecture: CQRS and Event Sourcing

CQRS/ES stands for Command Query Responsibility Segregation and Event Sourcing. These are different techniques and can be used separately. But when used together, they enable us to built systems which are more resilient and elastic. As is always the case, we need to make trade-offs when we plan to use these techniques.

In traditional style of data persistence, every data update in database removes older state of data and current state is written. So previous state is no more available and we only have current version of data. This approach of persistence is called State Based Persistence. This style captures current state of data, but doesn't tell how we got to it. If we find an issue with current state of the data caused due to some change in past, we can't fix it.

A workaround for the problems faced in state based persistence is to use audit log. Audit log captures all the events in history which led to current state. So we can check which event in the past caused the issue and we can fix the current state. But audit log has its own issues. First, audit log and state can go out of sync. Second, if we store audit log in a file then we can't update data and audit file in same transaction. Also, now we have to decide which of the two is source of truth, data or audit log.

Event Sourcing(ES) is an alternate approach of solving all these issues. ES eliminates persistence of state and only persists the event which causes change in state. Event log is append only. We never modify an existing event or delete it. So ES basically is about capturing the intent, rather than destination. ES thus captures the journey of all the events which led to current state of the system. And to determine state of the system, we just replay the events. We need to take care that side effects related to actual execution of events are avoided when we replay the events. For example, when an event was originally captured, it also send a notification. So while replaying the event to determine current state, we must take care that notification part is not executed.

In most of the cases, number of events we may need to replay to determine current state is not so high. But for some use-cases it may be as high as hundreds or thousands of events. Replaying so many events may hit the system performance. We can make use of snapshot based approach for such scenarios. What it means is that we take snapshot of the state at regular intervals which determines  state of the system at that point of time. And when we need to determine current state we only replay those events which occurred after latest snapshot.

ES provides several advantages in system building:
  • Built in audit log
  • Append only database operations are more efficient than update operations
  • It is easy to rectify the errors made in the past by fixing the bug and obliterating the events and replaying them
It is common for data model to change with time. In case of state based persistence, this is not much of an issue. We can add/remove appropriate columns to the tables and provide default values for records already in the system. But for ES, it is a bit complicated. For ES, event log is sacrosanct. An event in the event log can't be modified or deleted. So if data model evolves, we need to add new version of event log. This adds to complexity in application layer as we need to continue to support older versions of event log. A best practice is to have flexible format for event log, like protobuf. On other hand something like Java serialization highly inflexible.

In Domain Driven Design, we use concepts like aggregate and aggregate root. For example, in a reservation system, we have have reservation as aggregate root and it fits well with writing reservations in the database. But if we want to fetch reservations for a customer, customer becomes a better aggregate root. This shows a conflict between read and write models. An aggregate root seems not to support all possible queries. Moreover, systems generally have different loads for read and write queries. So supporting both read and write with same model is not a good idea.

Command Query Responsibility Segregation(CQRS) is a technique to tackle this problem. This approach is based on having separate models for write and mode database operations. When system receives a command, it goes through write model and when a query arrives it goes through read model. We can have multiple read models based on the use-case. For example, in a reservation system we can have a read model to read reservations based on location(using location as aggregate root) and another read model to fetch reservations based on customer(using customer as aggregate root). And as we have separate models, we can optimize them as per needs. Write model is optimized for writing while read models are optimized for reading. For example, we can make have separate schemas, like we can make read schema denormalized so that joins are not needed, thus making reads fast, or we can use different data stores(polyglot persistence, like Elasticsearch has better read efficiency while Cassandra writes are much more efficient) or we can use different scaling schemes for read and write as per the load requirements.

Though ES and CQRS can be used independently, they are often used together as they compliment each other very well. Write model is used just to persist events. But events are not good candidate for read purposes. So we make use of a process called projection which consumes events and transforms them to a more read friendly form, which is mostly a denormalized form. Read model now reads from this new projected form rather than original events.
CQRS/ES based system also makes it possible to add new read queries easily. We just need to add a new projection to transform events to the desired form. Read and write models can evolve independently as per the needs.

Cost of CQRS/ES

CQRS/ES has various advantages as discussed above, but it also incurs some costs, major being:
  • CQRS model is considered to be more complex
  • May need to maintain multiple data stores
  • More number of classes/objects(commands, events, queries)
  • Higher storage size needed due to even log and data duplication in read model

Saturday, April 18, 2020

Reactive Architecture: Distributed Messaging Patterns

According to the Reactive Manifesto, a critical requirement for any Reactive system is it to be message driven. Message driven systems communicate through asynchronous, non-blocking messages. Such messages help us to build systems that are both resilient, and elastic, and therefore responsive under a variety of situations. But when we choose to build systems using asynchronous, non-blocking messages, there are consequences.

Message Driven Architecture

In a message driven architecture, components communicate using asynchronous non-blocking messages. It means components send messages to other components, but don't wait for response and continue doing other tasks, and final response arrives asynchronously. This ensures that different parts of the system are not waiting on each other. This way of communication provides several key advantages:

  • Resources like threads, CPU cycles etc are not blocked
  • Reduces the contention, and hence improves scalability
  • If a component is down, messages to it can be queued and delivered later on, thus improving reliability of the system
Asynchronous messages must form the backbone of reactive systems. But it doesn't mean that synchronous messaging should never be used. We may need to use synchronous messages, like for acknowledging receipt of a message, even though message is processed asynchronously.

Asynchronous messaging is technically more challenging compared to synchronous messaging. In context of distributed systems, managing a transaction across multiple databases and microservices is difficult as keeping transaction open for a long period of time increases probability of failures and hence makes system brittle.

Saga

A saga is a way of representing a long running transaction. It encapsulates multiples requests, which can either be run in parallel or sequentially. When all requests complete successfully, saga is marked as success. Every request is paired with a compensating action. This is needed for failure scenario. If any of the request fails, compensating action is executed for all requests executed successfully so far, and saga is marked as failure. If a compensating action fails, we keep trying until it is a success.
If a request times out, it may be either that request failed or request succeeded but reply failed or request is queued. Hence we need to make sure that its compensating action doesn't have any negative impact if request itself failed.

Message Delivery Guarantee

In distribute systems, message delivery poses an interesting problem. We always strive for Exactly Once delivery.When component A sends message to component B, it can't be sure whether message was delivered or not. B can send back an acknowledgement, but this itself can be lost in network. So we are never sure of message deliveries. A can retry, but then it may lead to message duplication. This means that Exactly Once delivery guarantee is not possible.
We thus have to settle with following delivery guarantees:

At Most Once Delivery
This approach means that we deliver message just once, and we don't retry in case of failure. As a consequence, message is never duplicated. But as message may be lost, so there is no guarantee of its delivery. It means that receiver would get the message once or never. As we don't retry, so we don't need to store the message. Also it is easy to implement.

At Least Once Delivery
This approach guarantees that every message is eventually delivered. If there is a failure, we retry to send the message. Now failure can be either because message was not delivered or because acknowledgement from receiver failed. So retry may lead to duplication of message, but message is eventually delivered. This also means that we need a durable storage, like file system or database, for the message so that it's not lost.

Messaging Patterns

There are 2 messaging patterns when we build reactive systems.

Point To Point
Point to Point messaging pattern involves components calling each other directly. This means services know each other's API, and if any API is updated, dependent services need to be updated too. This means level of coupling is high, and complexities are more easily observable. Dependencies too are easily observable.

Publish/Subscribe
In this pattern, services are not aware of each other and hence they are not directly coupled. There is a message bus/broker. Services publish their messages to this bus, and other services subscribe to these messages. This means their is high level of decoupling between services, and services are only aware of the message format. We can easily add/replace or remove services without impacting others. Complexities involved are difficult to see. It is bit more difficult to find dependencies as they are not talking directly.

Reactive Architecture: Reactive Microservices

Microservices are a subset of Service Oriented Architecture(SOA). Major point of difference is that SOA doesn't put any constraint upon way of deployment. We can deploy SOA based application as a monolith or individually deploy SOA based services. Microervices strictly require services to be deployed independent of each other. Individual service can be deployed whenever it's needed and it can be deployed on as many number of machines as required.
Major advantages and features of microservices are:
  • Each service can be deployed independently
  • Services can evolve on technologically different path 
  • Each service has it's own database
  • Services can be scaled independently, as per requirement of each service
  • Isolated failures, thus higher availability, fault tolerance and resilient
  • Services communicate synchronously(request-response protocol like HTTP) or asynchronously(reactive microservices)
  • Services are loosely couple
  • Rapid deployments(upto continuous integration)
  • Feature releases as and when ready
  • Teams are more devops oriented

Principles of Isolation

When we talk about correct size of reactive microservices, a better way of putting it is to how to isolate microservices so that they are less coupled and more scalable. Reactive microservices must be tried to be isolated in terms of state, space, time and failure.

Isolation of State
This means that any access to microservice's data is possible only via it's public API. There must be no attempt to directly access microservice's database. This helps in ensuring that microservice can evolve internally without having any impact on other parts of the application. We can keep changing code implementation, data schemas etc as much as we want, but external world remains undisturbed as long as public API of the microservice remains same.

Isolation in Space
It is critical that microservices invoking each other are not impacted by fact that where they are deployed. They may be running on same hardware or different machines or different  data centres. This is called isolation of space. This ensures that we can easily scale up or down the microservices.

Isolation in Time
Reactive microservices communicate through asynchronous non-blocking messaging. This means that they don't wait for each other's response after invocation. This greatly helps in efficient usage of resources like threads, CPU cycles, memory etc., else these resources are kept blocked by request-response style of communication. This is called isolation in time. Also, reactive microservices must expect eventual consistency. This helps in achieving higher scalability. Strong/total consistency requires central coordination which limits scalability.

Isolation of Failure
Isolation of failure means that if a microservice fails then this failure must be restricted to it, and other dependent microservices must continue to function. This makes our system resilient and fault tolerant.

Isolation Techniques

There are several techniques which allow us to achieve above-mentioned levels of isolation.

Bulkheading
Bulkheading has its origin in shipping industry. In ships. we have different sections which are totally isolated from each other, and during accidents if one of the section is filled with water other sections are isolated from it and ship continues to work.
Similarly, in context of reactive architecture, we isolate our components so that failure in a component is limited to it and system continues to perform, even if a bit degraded. A properly bulkheaded system would ensure that failure in one microservice doesn't cascade into other microservices.

Circuit Breakers
Circuit breakers is a mechanism which helps to ensure that components which are already not performing as expected due to heavy stress are not put under more stress. If we keep calling a service which is already failing then we are putting it under more stress. Circuit breakers work in three states. Under normal circumstances, circuit breaker is in CLOSED state and it allows every call to the target service. If service starts failing, circuit breaker goes into OPEN state, meaning it doesn't allow any request to the service and we may return a custom response/exception. Circuit breaker would wait for a configured time period, and once that time period is over, it goes into a HALF-OPEN state and  allows a single request to the target service. If request succeeds, circuit breaker goes into CLOSED state again, else it goes back to OPEN state.

Asynchronous Messaging
In traditional request-response based communication, caller component keeps waiting fro response from called component. This blocks the costly resources like threads, CPU cycles etc. and hence hits the performance of the system. Also, if there is a failure in called service, calling service too fails.
Asynchronous messaging decouples components both in time and failure. Calling service just send a request to another service and keeps doing other stuff, thus totally decoupled in time. There is no blocking of threads, CPU cycles. Also, if a failure is there we can handle it in a much better and easy way.

Gateway Service
Though adding microservices to a system has lots of advantages, like high saclability, resilience, and more isolation in terms of failure. But it also adds more complexity to the system, particularly to the client. Now client may need to make multiple calls to the individual services to aggregate data. To solve this, we may introduce intermediate service between client and the microservices. Intermediate service is called gateway service. Client calls gateway service, which then calls microservices and aggregate the data returned, and send it back to client. We can even have one gateway service per microservice.

Tuesday, April 14, 2020

Reactive Architecture: Domain Driven Design

Domain Driven Design(DDD) is an architectural approach to design large software systems. It has its roots in a book named 'Domain Driven Design: Tackling Complexity in the Heart of Software' by  Eric Evans. Interestingly, rules and guidelines laid down by DDD are highly compatible with those prescribed in reactive architecture. DDD basically emphasizes on breaking a large domain into as small domains as possible. Large domains are difficult to model and breaking them into smaller domains helps in simplifying the overall problem. This decomposition of a large problem into smaller domains result into domain boundaries between those smaller pieces of the overall system.

On the other hand, if we look at microservices architecture, goal is very similar to have self sufficient individual services which cater only a specific functionality of the system with a very well defined API for each microservice and have clear cut functional boundaries in between those individual services. And if we add reactive principle of asynchronous non-blocking interaction between these microservices, what we get is a system consisting of reactive microservices.

Combining DDD with reactive microservices helps us to easily and correctly determine domain/functional boundaries between our microservices, which otherwise is not an easy task. There are reactive platforms available, like Lagom, which are built on ideas and concepts of DDD, and makes it easy for developers to imbibe these ideas into their own application.

Of course, both DDD and reactive architecture can be used separately without other. But compatibility between underlying principles of both is very high and hence both are used together very commonly.

Domain and Ubiquitous language

A domain can be defined as an area of knowledge. For ex ample, in health domain we have doctors, nurses, chemists, hospitals, clinics, medical instruments, disease, pathologists etc. We can express this domain knowledge n various ways. Like, we can use diagrams or documentation. Software is also a way to express this knowledge. In this way, a software becomes an implementation of a domain. People who are involved in such a domain are called domain experts.
DDD is an approach in which we develop our software in a way that it implements a domain model and domain experts can easily relate to it. When we do it, it becomes very easy for domain experts and developers to communicate. The communication language is called Ubiquitous language in DDD terminology and it comes from domain experts. Software developers talk to domain experts and get used to the vocabulary those experts use instead of forcing domain experts to get used to software terms, and over course of time their own domain knowledge improves to expert level. There may be exceptions when software developers have to add some software abstractions to Ubiquitous language due to lack of corresponding word in domain. Basic idea is to have discussions about software without using software jargon.

Breaking the Domain

Domains are quite large and complex in themselves and it's really difficult to model them as a whole. One of the DDD goal is to decompose original domain into smaller sub-domains, so that they can be understood easily. When we do so, each sub-domain has its won ubiquitous language and a model. Combination of model and language for a domain is called its Bounded Context. This way we define bounded contexts for all of our sub-domains. It may happen that there is some duplication of ideas and concepts across sub-domains, but temptation to abstract those concepts must be avoided as with time they evolve and become different.
Bounded Context is one of the most important outcome of domain decomposition and they are very good starting points for reactive microservices. We may need to further break a bounded context into several microservices, but they give us an idea that what parts(bounded contexts) must not be part of same microservice.
We may make use of objects to identify bounded contexts in domain. But a more recent approach is to make use of activities and events to identify bounded contexts. This is called Event First DDD. We may simplify the process by using notation like subject-verb-object. This helps us to have a consistent way of phrasing activities and events. Like, doctor examines patient. Doctor, the subject, performs an action, examines, on an object, patient. Once we phrase all our activities and events in our domain this way, it becomes easy to group together related events and activities, thus giving us a better idea of bounded contexts in the domain.

Domain Activities

Within a bounded context, activities can be divided into several categories.

Command: A command represents a request to perform an action. As it is a request, action asked for hasn't happened yet and it is meant for future. Also, as it is a request, it can be rejected. Important thing about command is that it is meant for a specific recipient, like a microservice, and when that recipient accepts it and executes it, state of the domain undergoes a change. Some examples of command can be like, 'Put an order', or 'Make a reservation'. Recipient of the command can reject the command for any reason it deem fit.

Event: An event represents an action that has already happened, and hence notion of rejection has no meaning to it. Unlike command, event is broadcast to multiple destinations, like multiple microservices. Another conceptual difference with command is that while command causes a change in state of the domain, an event records the change in state. And hence, an event is mostly consequence of a command.

Query: A query represents a request for state of the domain, and request sender expects a response, unlike command or event which don't necessarily warrant a response. A query is generally targeted to a specific recipient. Importantly, a query should never cause a change in state of the domain. If we repeat a query, we always get same response back given there has been no change in the state.

In a reactive system, asynchronous non-blocking messaging is the means of communication. Commands, events, and queries are the types of messages we use in message driven system. These messages form the API of a bounded context(microservice, for example).
If we send a command, 'Make a reservation', with all details needed for it, we don't get a response containing status of the reservation(completed/rejected). What we may get is some sort of acknowledgement of receival of the request. This way of communication is asynchronous in nature. To actually know the status of reservation request, we may need to monitor some event, like 'Reservation completed' or 'Reservation rejected'. This way of communication where we don't wait for response makes system asynchronous and message driven.

Suppose we are working on an application which tries to implement a restaurant system. With help of DDD, we have identified bounded contexts like, Orders, Reservations, Payments, Customers, Menus etc.
When we finally try to implement a bounded context in code, we need to map activities defined in bounded context to software entities. When we do so, we use terms used in ubiquitous language. For example, for command 'Open an Order', we have corresponding class or object named as OpenOrder.

Domain Objects

Finding bounded contexts by defining activities also helps us in defining different types of domain objects. There are several categories of domain objects:

Value Object: A value object is basically meant to represent a piece of information. Two value objects containing same information are functionally same. Value object is immutable, because if we could modify it's information it would functionally become another object. Example for a value object can be an object containing address data. Though main purpose of value object is to contain immutable data, they can also contain some basic business logic, like logic to extract some extra information from its attributes. In a reactive system, value objects are perfect for creating messages which are then passed between different components.

Entity: An entity differs from value object in the sense that it is mutable, but contains an attribute which is immutable and acts as identifier for it. For example, a person can be represented as an entity. Person can have attributes which undergo change, like weight, age etc but it has an identifier attribute(like SSN) which uniquely identifies it. Two persons having same identifier represent same person in the system. Entity contains considerable amount of business logic too, like all rules which control modifications of the attributes. In an actor model, entities are perfect for creating actors as actors have a unique identifier in form of their address, and they have a state which is mutable.

Aggregate: An aggregate is a specific type of object which represents a group of domain objects. At the root of this group is always an entity called as aggregate root, which then contains other domain objects. For example, a Person can have address, phone number and name. This group represents an aggregate with Person as aggregate root and other domain objects like address, phone number and name completing rest of the aggregate. We can't access domain objects contained inside an aggregate directly and we need to go through aggregate root for it. Another important aspect of aggregate is that a transaction should not span across multiple aggregate roots. Doing so means aggregates are not defined correctly or transaction has problems. In reactive systems, aggregates make good candidates for distribution. A very important aspect is figuring out aggregate roots. In many cases we have a single aggregate root per bounded context but that's not always the case.

Domain Abstractions

Apart from activities and domain objects, a bounded context also consists of certain abstractions. A very common example of domain abstraction is service. A service, unlike domain objects, is stateless and is basically meant to implement a specific business logic. This logic is not suitable to be part of a value object or an entity. An accepted design practice is to define an abstraction for a service and then provide concrete implementations for it. Domain just know about the abstraction, and doesn't interact with concrete implementation. This design approach has advantage of replacing one implementation with another when needed.
Having services is a good idea, but too many services in the domain also means that services are doing much of the work and this leads to what we call anemic domain. Before we create a service, we should first try to have the job done by entity or a value object.
Some good candidates for service implementation can be email sender service, hashing service, repository service, factory implementation.


Sunday, April 12, 2020

Reactive Architecture and Actor Model

As compared to traditional applications, modern applications differ mainly in following ways:

  • Distributed: Applications are distributed, i.e installed and running on huge number of machines simultaneously, like tens or even thousands.
  • Data Size: Size of data being handled is much much huge(GBs vs PBs)
  • Active Data: Earlier data usually was at rest, i.e not changing actively once pulled in by a batch job which runs like once in a day. Now data is changing rapidly and we need to keep up our data updated, else we are left behind and it's difficult to catch up.
  • Zero Downtime: In past, applications downtime for maintenance or due to system crash were quite considerably high and acceptable. This is no more true. Users expect to access applications without delay.
  • Fast Response: Modern applications are expected to respond lightning fast. Users have options available, and if applications don't meet their expectations they can very easily move to other options.


Reactive architecture is all about developing applications which are highly responsive, even under heavy load or even if some parts of the application are down. Goal is basically to develop a system which:
  • Can be distributed over hundreds or even thousands of machines
  • Scales from few to millions of users
  • Continues to function even if there are failures
  • Uses only resources required for current load
  • Maintains satisfactory level of responsiveness even under heavy load or under failures
Reactive architecture is based on reactive principles, as described in reactive manifesto.




  • Responsive: A reactive system is responsive, i.e it responds in a timely manner.
  • Resilient: A reactive system remains responsive even when there are failures in the system. Means to achieve this are replication of components, containment of failures, isolation of failures, and delegation of tasks.
  • Elastic(Scalable): A reactive system remains responsive even under varying loads. It means as load varies, system can increase or decrease the resources allocated to handle the requests. As a result, system doesn't have central bottlenecks or contention points.
  • Message Driven: A reactive system makes use of asynchronous non-blocking message passing between different system components to establish a boundary between components that ensures loose coupling, isolation and location transparency. This also enables to communicate failures as messages.

Direction of arrows in above figure tells that being Responsive is facilitated by other three, being Elastic is made possible by  being Message Driven and being Resilient, and being Resilient is made possible by being Message Driven  and being Elastic.

Reactive programming is a programming technique which involves breaking a problem into small discrete steps and then executing those steps in asynchronous non-blocking manner, particularly using language specific techniques and APIs like Futures, Promises, CompletableFutures, Streams, RxJava, RxScala and several others. This is a way of ensuring that system resources like threads, CPU cycles etc. are not blocked.

But reactive programming is different from reactive architecture. Reactive architecture is used at architecture level to create systems or components, like reactive microservices, which follow reactive principles explained above. Merely implementing reactive programming does't mean that system follows reactive principles, though reactive programming tools and means can be used to built reactive systems.

Actor model

Actor model is a reactive programming paradigm which enables us to make reactive systems which follow reactive principles.

All communications between actors in actor model is based on asynchronous non-blocking messaging. And hence it automatically supports elastic and resilient aspects of a reactive system. Some additional facilities provided by actor model, like location transparency, makes it very easy to built highly responsive software systems.

On JVM, Akka toolkit is an implementation of actor model and it helps to build reactive systems. Based on it, additional reactive tools like Lagom and Akka streams are built.

Basic idea behind actor model is to compose the system of entities called Actors. An actor basically encapsulates state and behaviour, just like any other object on JVM. Difference is that actors communicate by sending asynchronous messages to each other rather than by method invocations. This asynchronous communication imparts location transparency to the actors. It means that actors communicate only through this mechanism irrespective of their location and local vs remote communication is merely about configuration. When an actor sends a message to another actor, it first goes to a router, which can also be an actor, and router then routes it to other actor. Other actor can be either residing on same JVM or some other JVM. From original actor's perspective, it has no knowledge of location of the actor which actually receives it and it doesn't even need to know it. Same API is used to send messages to actors, remote or local. This is what location transparency is all about. We just configure routers, and communication mechanism remains same. This makes actor system elastic and resilient. Now we can deploy actors on multiple machines easily. If one of the machine goes down, others are available to share the load, thus making system resilient. Also, if load goes up, we can easily deploy actors on new machines(or remove machines when load goes down), thus making system elastic.

Compared to other reactive programming tools like Rx, Futures,Promises, Streams etc, actor model provides support for all reactive principles. Other approaches support reactive system development partially, and hence we may need to add other tools and techniques like load balancing etc. Actor model, on other hand, supports asynchronous messaging, and being location transparent it enables system to be elastic and resilient. All these reactive principles then add responsiveness to the system. Though we can develop a system using actor model and still not reactive, but using it along with other tools based on it, developing a reactive system becomes much more easy. 

We can built a reactive application without actor model. Like, we can make use of technologies like service registry, service discovery and load balancer to impart location transparency to our system. And then we can add something like a message bus to make asynchronous non-blocking messaging possible between components of the system. This all makes our system message driven, elastic and resilient and hence responsive. But as compared to actor model, these tools and techniques are not built-in, but rather added upon. This means we need additional infrastructure. Another difference is that with these tools we can make systems which are reactive at large scale, like at microservices level. But inside microservices, components may not be reactive. In case of actor model, actors residing inside microservices can be easily made reactive.