Reputation: 4002
I am building Microservices. One of my MicroService is using CQRS and Event sourcing. Integration events are raised in the system and i am saving my aggregates in event store also updating my read model.
My questions is why we need version in aggregate when we are updating the event stream against that aggregate ? I read we need this for consistency and events are to be replayed in sequence and we need to check version before saving (https://blog.leifbattermann.de/2017/04/21/12-things-you-should-know-about-event-sourcing/) I still can't get my head around this since events are raised and saved in order , so i really need concrete example to understand what benefit we get from version and why we even need them.
Many thanks,
Imran
Upvotes: 11
Views: 6228
Reputation: 1451
Let me describe a case where aggregate versions are useful:
In our reSove framework aggregate version is used for optimistic concurrency control.
I'll explain it by example. Let's say InventoryItem
aggregate accept commands AddItems
and OrderItems
. AddItems
increases number of items in stock, OrderItems
- decreases.
Suppose you have an InventoryItem
aggregate #123 with one event - ITEMS_ADDED
with quantity of 5. Aggregate #123 state say there are 5 items in stock.
So your UI is showing users that there are 5 items in stock. User A decide to order 3 items, user B - 4 items. Both issue OrderItems
commands, almost at the same time, let's say user A is first by couple milliseconds.
Now, if you have a single instance of aggregate #123 in memory, in the single thread, you don't have a problem - first command from user A would succeed, event would be applied, state say quantity is 2, so second command from user B would fail.
In a distributed or serverless system where commands from A and B would be in separate processes, both commands would succeed and bring aggregate into incorrect state if we don't use some concurrency control. There several ways to do this - pessimistic locking, command queue, aggregate repository or optimistic locking.
Optimistic locking seems to be simplest and most practical solution:
We say that every aggregate has a version - number of events in its stream. So our aggregate #123 has version 1.
When aggregate emits an event, this event data has an aggregate version. In our case ITEMS_ORDERED
events from users A and B will have event aggregate version of 2. Obviously, aggregate events should have versions to be sequentially increasing. So what we need to do is just put a database constraint that tuple {aggregateId, aggregateVersion}
should be unique on write to event store.
Let's see how our example would work in a distributed system with optimistic concurrency control:
User A issues a command OrderItem
for aggregate #123
Aggregate #123 is restored from events {version 1, quantity 5}
User B issues a command OrderItem
for aggregate #123
Another instance of Aggregate #123 is restored from events (version 1, quantity 5)
Instance of aggregate for user A performs a command, it succeeds, event ITEMS_ORDERED {aggregateId 123, version 2}
is written to event store.
Instance of aggregate for user B performs a command, it succeeds, event ITEMS_ORDERED {aggregateId 123, version 2}
it attempts to write it to event store and fails with concurrency exception.
On such exception command handler for user B just repeats the whole procedure - then Aggregate #123 would be in a state of {version 2, quantity 2}
and command will be executed correctly.
I hope this clears the case where aggregate versions are useful.
Upvotes: 23
Reputation: 9283
Yes, this is right. You need the version or a sequence number for consistency.
Two things you want:
Correct ordering
Usually events are idempotent in nature because in a distributed system idempotent messages or events are easier to deal with. Idempotent messages are the ones that even when applied multiple times will give the same result. Updating a register with a fixed value (say one) is idempotent, but incrementing a counter by one is not. In distributed systems when A sends a message to B, B acknowledges A. But if B consumes the message and due to some network error the acknowledgement to A is lost, A doesn't know if B received the message and so it sends the message again. Now B applies the message again and if the message is not idempotent, the final state will go wrong. So, you want idempotent message. But if you fail to apply these idempotent messages in the same order as they are produced, your state will be again wrong. This ordering can be achieved using the version id or a sequence. If your event store is an RDBMS you cannot order your events without any similar sort key. In Kafka also, you have the offset id and client keeps track of the offset up to which it has consumed
Deduplication
Secondly, what if your messages are not idempotent? Or what if your messages are idempotent but the consumer invokes some external services in a non-deterministic way. In such cases, you need an exactly-once semantics because if you apply the same message twice, your state will be wrong. Here also you need the version id or sequence number. If at the consumer end, you keep track of the version id you have already processed, you can dedupe based on the id. In Kafka, you might then want to store the offset id at the consumer end
Further clarifications based on comments:
The author of the article in question assumed an RDBMS as an event store. The version id or the event sequence is expected to be generated by the producer. Therefore, in your example, the "delivered" event will have a higher sequence than the "in transit" event.
The problem happens when you want to process your events in parallel. What if one consumer gets the "delivered" event and the other consumer gets the "in transit" event? Clearly you have to ensure that all events of a particular order are processed by the same consumer. In Kafka, you solve this problem by choosing order id as the partition key. Since one partition will be processes by one consumer only, you know you'll always get the "in transit" before delivery. But multiple orders will be spread across different consumers within the same consumer group and thus you do parallel processing.
Regarding aggregate id, I think this is synonymous to topic in Kafka. Since the author assumed RDBMS store, he needs some identifier to segregate different categories of message. You do that by creating separate topics in Kafka and also consumer groups per aggregate.
Upvotes: 0