Imran Arshad
Imran Arshad

Reputation: 4002

Version number in event sourcing aggregate?

I am building Microservices. One of my MicroService is using CQRS and Event sourcing. Integration events are raised in the system and i am saving my aggregates in event store also updating my read model.

My questions is why we need version in aggregate when we are updating the event stream against that aggregate ? I read we need this for consistency and events are to be replayed in sequence and we need to check version before saving (https://blog.leifbattermann.de/2017/04/21/12-things-you-should-know-about-event-sourcing/) I still can't get my head around this since events are raised and saved in order , so i really need concrete example to understand what benefit we get from version and why we even need them.

Many thanks,

Imran

Upvotes: 11

Views: 6228

Answers (2)

Roman Eremin
Roman Eremin

Reputation: 1451

Let me describe a case where aggregate versions are useful:

In our reSove framework aggregate version is used for optimistic concurrency control.

I'll explain it by example. Let's say InventoryItem aggregate accept commands AddItems and OrderItems. AddItems increases number of items in stock, OrderItems - decreases. Suppose you have an InventoryItem aggregate #123 with one event - ITEMS_ADDED with quantity of 5. Aggregate #123 state say there are 5 items in stock.

So your UI is showing users that there are 5 items in stock. User A decide to order 3 items, user B - 4 items. Both issue OrderItems commands, almost at the same time, let's say user A is first by couple milliseconds.

Now, if you have a single instance of aggregate #123 in memory, in the single thread, you don't have a problem - first command from user A would succeed, event would be applied, state say quantity is 2, so second command from user B would fail.

In a distributed or serverless system where commands from A and B would be in separate processes, both commands would succeed and bring aggregate into incorrect state if we don't use some concurrency control. There several ways to do this - pessimistic locking, command queue, aggregate repository or optimistic locking.

Optimistic locking seems to be simplest and most practical solution:

We say that every aggregate has a version - number of events in its stream. So our aggregate #123 has version 1.

When aggregate emits an event, this event data has an aggregate version. In our case ITEMS_ORDERED events from users A and B will have event aggregate version of 2. Obviously, aggregate events should have versions to be sequentially increasing. So what we need to do is just put a database constraint that tuple {aggregateId, aggregateVersion} should be unique on write to event store.

Let's see how our example would work in a distributed system with optimistic concurrency control:

  • User A issues a command OrderItem for aggregate #123

  • Aggregate #123 is restored from events {version 1, quantity 5}

  • User B issues a command OrderItem for aggregate #123

  • Another instance of Aggregate #123 is restored from events (version 1, quantity 5)

  • Instance of aggregate for user A performs a command, it succeeds, event ITEMS_ORDERED {aggregateId 123, version 2} is written to event store.

  • Instance of aggregate for user B performs a command, it succeeds, event ITEMS_ORDERED {aggregateId 123, version 2} it attempts to write it to event store and fails with concurrency exception.

  • On such exception command handler for user B just repeats the whole procedure - then Aggregate #123 would be in a state of {version 2, quantity 2} and command will be executed correctly.

I hope this clears the case where aggregate versions are useful.

Upvotes: 23

Saptarshi Basu
Saptarshi Basu

Reputation: 9283

Yes, this is right. You need the version or a sequence number for consistency.

Two things you want:

  1. Correct ordering
    Usually events are idempotent in nature because in a distributed system idempotent messages or events are easier to deal with. Idempotent messages are the ones that even when applied multiple times will give the same result. Updating a register with a fixed value (say one) is idempotent, but incrementing a counter by one is not. In distributed systems when A sends a message to B, B acknowledges A. But if B consumes the message and due to some network error the acknowledgement to A is lost, A doesn't know if B received the message and so it sends the message again. Now B applies the message again and if the message is not idempotent, the final state will go wrong. So, you want idempotent message. But if you fail to apply these idempotent messages in the same order as they are produced, your state will be again wrong. This ordering can be achieved using the version id or a sequence. If your event store is an RDBMS you cannot order your events without any similar sort key. In Kafka also, you have the offset id and client keeps track of the offset up to which it has consumed

  2. Deduplication
    Secondly, what if your messages are not idempotent? Or what if your messages are idempotent but the consumer invokes some external services in a non-deterministic way. In such cases, you need an exactly-once semantics because if you apply the same message twice, your state will be wrong. Here also you need the version id or sequence number. If at the consumer end, you keep track of the version id you have already processed, you can dedupe based on the id. In Kafka, you might then want to store the offset id at the consumer end

Further clarifications based on comments:

The author of the article in question assumed an RDBMS as an event store. The version id or the event sequence is expected to be generated by the producer. Therefore, in your example, the "delivered" event will have a higher sequence than the "in transit" event.

The problem happens when you want to process your events in parallel. What if one consumer gets the "delivered" event and the other consumer gets the "in transit" event? Clearly you have to ensure that all events of a particular order are processed by the same consumer. In Kafka, you solve this problem by choosing order id as the partition key. Since one partition will be processes by one consumer only, you know you'll always get the "in transit" before delivery. But multiple orders will be spread across different consumers within the same consumer group and thus you do parallel processing.

Regarding aggregate id, I think this is synonymous to topic in Kafka. Since the author assumed RDBMS store, he needs some identifier to segregate different categories of message. You do that by creating separate topics in Kafka and also consumer groups per aggregate.

Upvotes: 0

Related Questions