Reputation: 23
I'm trying to create a command handler that treats the command as a number of sub-commands. Each sub-command will generate an event (which should then update the state of the aggregate). The processing of each sub-command relies on the state of the aggregate being up-to-date (from the previous sub-command).
For example, consider the following aggregate:
package axon.poc
import org.axonframework.commandhandling.CommandHandler
import org.axonframework.eventsourcing.EventSourcingHandler
import org.axonframework.modelling.command.AggregateIdentifier
import org.axonframework.modelling.command.AggregateLifecycle
import org.axonframework.spring.stereotype.Aggregate
import org.slf4j.LoggerFactory
import java.util.UUID
@Aggregate
class Aggregate() {
companion object {
private val logger = LoggerFactory.getLogger(Aggregate::class.java)
}
@AggregateIdentifier
internal var aggregateId: UUID? = null
private var value: Int = 0
@CommandHandler
constructor(command: Command): this() {
logger.info("generating create event")
var applyMore = AggregateLifecycle.apply(CreatedEvent(command.aggregateId))
for (i in 0 until command.value) {
applyMore = applyMore.andThenApply {
logger.info("generating update event: ${value+1}")
UpdatedEvent(command.aggregateId, value+1)
}
}
logger.info("completed command handler")
}
@EventSourcingHandler
fun on(event: CreatedEvent) {
logger.info("event sourcing handler: $event")
this.aggregateId = event.aggregateId
this.value = 0
}
@EventSourcingHandler
fun on(event: UpdatedEvent) {
logger.info("event sourcing handler: $event")
this.value = event.value
}
}
When Command(value = 2)
is handled by this code, it generates
[main] INFO org.axonframework.spring.stereotype.Aggregate - generating create event
[main] INFO org.axonframework.spring.stereotype.Aggregate - completed command handler
[main] INFO org.axonframework.spring.stereotype.Aggregate - event sourcing handler: CreatedEvent(aggregateId=65a7a461-61bb-451f-b2d9-8460994eeb1a)
[main] INFO org.axonframework.spring.stereotype.Aggregate - generating update event: 1
[main] INFO org.axonframework.spring.stereotype.Aggregate - generating update event: 1
[main] INFO org.axonframework.spring.stereotype.Aggregate - event sourcing handler: UpdatedEvent(aggregateId=65a7a461-61bb-451f-b2d9-8460994eeb1a, value=1)
[main] INFO org.axonframework.spring.stereotype.Aggregate - event sourcing handler: UpdatedEvent(aggregateId=65a7a461-61bb-451f-b2d9-8460994eeb1a, value=1)
The first event (CreatedEvent) is being handled before the applyMore
is executed. However, even though the applyMore
is chained, the UpdatedEvent
's are not handled by the event sourcing handler until both have been generated.
I was expecting (and hoping for):
[main] INFO org.axonframework.spring.stereotype.Aggregate - generating create event
[main] INFO org.axonframework.spring.stereotype.Aggregate - completed command handler
[main] INFO org.axonframework.spring.stereotype.Aggregate - event sourcing handler: CreatedEvent(aggregateId=65a7a461-61bb-451f-b2d9-8460994eeb1a)
[main] INFO org.axonframework.spring.stereotype.Aggregate - generating update event: 1
[main] INFO org.axonframework.spring.stereotype.Aggregate - event sourcing handler: UpdatedEvent(aggregateId=65a7a461-61bb-451f-b2d9-8460994eeb1a, value=1)
[main] INFO org.axonframework.spring.stereotype.Aggregate - generating update event: 2
[main] INFO org.axonframework.spring.stereotype.Aggregate - event sourcing handler: UpdatedEvent(aggregateId=65a7a461-61bb-451f-b2d9-8460994eeb1a, value=2)
Is this a bug with Axon? Or am I misunderstanding something with how it should be used? How can a number of "commands" be handled atomically? ie. all pass or all fail.
Upvotes: 2
Views: 786
Reputation: 2900
TLDR; It's a bug.
You've hit a very specific situation that only occurs in constructors. The challenge, from an Axon perspective, is that you need an instance to apply events on. However, that instance is only available once the constructor is complete.
The andThenApply
function is exactly provided for this purpose (and you're using it correctly). However, the code is (erroneously) evaluated a little too soon in your case. I had to run your code locally and debug through to find out what was exactly happening.
The root cause is that the andThenApply
implementation of AnnotatedAggregate
calls apply
, instead of publish
. The former will see it's currently executing delayed tasks, and schedules the actual publication at the end of these tasks. The next task does the same. Therefore both events end up being created first, and then published after they have been both created.
Would you be interested in filing this as an issue in the Axon issue tracker? That way, credits go where they belong.
Upvotes: 2