Golo Roden
Golo Roden

Reputation: 150614

Relation between command handlers, aggregates, the repository and the event store in CQRS

I'd like to understand some details of the relations between command handlers, aggregates, the repository and the event store in CQRS-based systems.

What I've understood so far:

So far, so good. Now there are some issues that I did not yet get:

Upvotes: 43

Views: 8233

Answers (3)

Dennis Traub
Dennis Traub

Reputation: 51624

The following is based on my own experience and my experiments with various frameworks like Lokad.CQRS, NCQRS, etc. I'm sure there are multiple ways to handle this. I'll post what makes most sense to me.

1. Aggregate Creation:

Every time a command handler needs an aggregate, it uses a repository. The repository retrieves the respective list of events from the event store and calls an overloaded constructor, injecting the events

var stream = eventStore.LoadStream(id)
var User = new User(stream)

If the aggregate didn't exist before, the stream will be empty and the newly created object will be in it's original state. You might want to make sure that in this state only a few commands are allowed to bring the aggregate to life, e.g. User.Create().

2. Storage of new Events

Command handling happens inside a Unit of Work. During command execution every resulting event will be added to a list inside the aggregate (User.Changes). Once execution is finished, the changes will be appended to the event store. In the example below this happens in the following line:

store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

3. Order of Events

Just imagine what would happen, if two subsequent CustomerMoved events are replayed in the wrong order.

An Example

I'll try to illustrate the with a piece of pseudo-code (I deliberately left repository concerns inside the command handler to show what would happen behind the scenes):

Application Service:

UserCommandHandler
    Handle(CreateUser cmd)
        stream = store.LoadStream(cmd.UserId)
        user = new User(stream.Events)
        user.Create(cmd.UserName, ...)
        store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

    Handle(BlockUser cmd)
        stream = store.LoadStream(cmd.UserId)
        user = new User(stream.Events)
        user.Block(string reason)
        store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

Aggregate:

User
    created = false
    blocked = false

    Changes = new List<Event>

    ctor(eventStream)
        isNewEvent = false
        foreach (event in eventStream)
            this.Apply(event, isNewEvent)

    Create(userName, ...)
        if (this.created) throw "User already exists"
        isNewEvent = true
        this.Apply(new UserCreated(...), isNewEvent)

    Block(reason)
        if (!this.created) throw "No such user"
        if (this.blocked) throw "User is already blocked"
        isNewEvent = true
        this.Apply(new UserBlocked(...), isNewEvent)

    Apply(userCreatedEvent, isNewEvent)
        this.created = true
        if (isNewEvent) this.Changes.Add(userCreatedEvent)

    Apply(userBlockedEvent, isNewEvent)
        this.blocked = true
        if (isNewEvent) this.Changes.Add(userBlockedEvent)

Update:

As a side note: Yves' answer reminded me of an interesting article by Udi Dahan from a couple of years ago:

Upvotes: 41

Constantin Galbenu
Constantin Galbenu

Reputation: 17683

I almost agree with yves-reynhout and dennis-traub but I want to show you how I do this. I want to strip my aggregates of the responsibility to apply the events on themselves or to re-hydrate themselves; otherwise there is a lot of code duplication: every aggregate constructor will look the same:

UserAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)


OrderAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)


ProfileAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)

Those responsibilities could be left to the command dispatcher. The command is handled directly by the aggregate.

Command dispatcher class

    dispatchCommand(command) method:
        newEvents = ConcurentProofFunctionCaller.executeFunctionUntilSucceeds(tryToDispatchCommand)
        EventDispatcher.dispatchEvents(newEvents)

    tryToDispatchCommand(command) method:
        aggregateClass = CommandSubscriber.getAggregateClassForCommand(command)
        aggregate = AggregateRepository.loadAggregate(aggregateClass, command.getAggregateId())
        newEvents = CommandApplier.applyCommandOnAggregate(aggregate, command)
        AggregateRepository.saveAggregate(command.getAggregateId(), aggregate, newEvents)

ConcurentProofFunctionCaller class

    executeFunctionUntilSucceeds(pureFunction) method:
        do this n times
            try
                call result=pureFunction()
                return result
            catch(ConcurentWriteException)
                continue
        throw TooManyRetries    

AggregateRepository class

     loadAggregate(aggregateClass, aggregateId) method:
         aggregate = new aggregateClass
         priorEvents = EventStore.loadEvents()
         this.applyEventsOnAggregate(aggregate, priorEvents)

     saveAggregate(aggregateId, aggregate, newEvents)
        this.applyEventsOnAggregate(aggregate, newEvents)
        EventStore.saveEventsForAggregate(aggregateId, newEvents, priorEvents.version)

SomeAggregate class
    handleCommand1(command1) method:
        return new SomeEvent or throw someException BUT don't change state!
    applySomeEvent(SomeEvent) method:
        changeStateSomehow() and not throw any exception and don't return anything!

Keep in mind that this is pseudo code projected from a PHP application; the real code should have things injected and other responsibilities refactored out in other classes. The ideea is to keep aggregates as clean as possible and avoid code duplication.

Some important aspects about aggregates:

  1. command handlers should not change state; they yield events or throw exceptions
  2. event applies should not throw any exception and should not return anything; they only change internal state

An open-source PHP implementation of this could be found here.

Upvotes: 1

Yves Reynhout
Yves Reynhout

Reputation: 2990

A small variation on Dennis excellent answer:

  • When dealing with "creational" use cases (i.e. that should spin off new aggregates), try to find another aggregate or factory you can move that responsibility to. This does not conflict with having a ctor that takes events to hydrate (or any other mechanism to rehydrate for that matter). Sometimes the factory is just a static method (good for "context"/"intent" capturing), sometimes it's an instance method of another aggregate (good place for "data" inheritance), sometimes it's an explicit factory object (good place for "complex" creation logic).
  • I like to provide an explicit GetChanges() method on my aggregate that returns the internal list as an array. If my aggregate is to stay in memory beyond one execution, I also add an AcceptChanges() method to indicate the internal list should be cleared (typically called after things were flushed to the event store). You can use either a pull (GetChanges/Changes) or push (think .net event or IObservable) based model here. Much depends on the transactional semantics, tech, needs, etc ...
  • Your eventstream is a linked list. Each revision (event/changeset) pointing to the previous one (a.k.a. the parent). Your eventstream is a sequence of events/changes that happened to a specific aggregate. The order is only to be guaranteed within the aggregate boundary.

Upvotes: 11

Related Questions