Reputation: 982
In Axon's reference guide it is written that
Besides these provided policies, you can define your own. All policies must implement the SequencingPolicy interface. This interface defines a single method, getSequenceIdentifierFor, that returns the sequence identifier for a given event. Events for which an equal sequence identifier is returned must be processed sequentially. Events that produce a different sequence identifier may be processed concurrently.
Even more, in this thread's last message it says that
with the sequencing policy, you indicate which events need to be processed sequentially. It doesn't matter whether the threads are in the same JVM, or in different ones. If the sequencing policy returns the same value for 2 messages, they will be guaranteed to be processed sequentially, even if you have tracking processor threads across multiple JVMs.
So does this mean that event processors are actually stateless? If yes, then how do they manage to synchronise? Is the token store used for this purpose?
Upvotes: 2
Views: 1047
Reputation: 7275
I think this depends on what you count as state, but I assume that from the point of view your looking at it, yes, the EventProcessor
implementations in Axon are indeed stateless.
The SubscribingEventProcessor
receives it's events from a SubscribableMessageSource
(the EventBus
implements this interface) when they occur.
The TrackingEventProcessor
retrieves it's event from a StreamableMessageSource
(the EventStore
implements this interface) on it's own leisure.
The latter version for that needs to keep track of where it is in regards to events on the event stream. This information is stored in a TrackingToken
, which is saved by the TokenStore
.
A given TrackingEventProcessor
thread can only handle events if it has laid a claim on the TrackingToken
for the processing group it is part of. Hence, this ensure that the same event isn't handled by two distinct threads to accidentally update the same query model.
The TrackingToken
also allow multithreading this process, which is done by segmented the token. The number of segments (adjustable through the initialSegmentCount
) drives the number of pieces the TrackingToken
for a given processing group will be partitioned in. From the point of view of the TokenStore
, this means you'll have several TrackingToken
instances stored which equal the number of segments you've set it to.
The SequencingPolicy
its job is to drive which events in a stream belong to which segment. Doing so, you could for example use the SequentialPerAggregate
SequencingPolicy
to ensure all the events with a given aggregate identifier are handled by one segment.
Upvotes: 2