matpiera
matpiera

Reputation: 13

Axon Saga subscribing mode concurrency issue

I'm seeing an unexpected behaviour when switching from tracking to subscribing mode in my Saga using Axon 4.3.5

It seems, in subscribing mode, there are two sagas created for the same association key-value when two threads reach two @StarSaga methods simultaneously. Am I missing something?

I've got this to reproduce it:

@Saga
@ProcessingGroup("Saga")
public class RaceSaga {

    @Inject
    protected transient CommandGateway commandGateway;

    @StartSaga
    @SagaEventHandler(associationProperty = "executionId")
    public void on(Exec exec) {
        commandGateway.sendAndWait(new CreateExecCommand(exec.getExecutionId(), exec.getDescription()));
    }

    @StartSaga
    @SagaEventHandler(associationProperty = "executionId")
    public void on(Risk risk) {
        commandGateway.sendAndWait(new CreateRiskCommand(risk.getExecutionId(), risk.getResult()));
    }
}

@IntegrationTest
class RaceConditionTest extends BaseIntegrationTest {

    @Autowired
    private EventGateway eventGateway;
    @Autowired
    private SagaStore sagaStore;

    @Test
    void sagaRace() {
        var execId = UUID.randomUUID();

        CompletableFuture.runAsync(() -> eventGateway.publish(new Exec(execId.toString(), "desc")));
        CompletableFuture.runAsync(() -> eventGateway.publish(new Risk(execId.toString(), "OK")));

        var association = new AssociationValue("executionId", execId.toString());
        await().during(5, SECONDS)
                .untilAsserted(() -> assertThat(sagaStore.findSagas(RaceSaga.class, association))
                        .hasSize(1));
    }
}

The test passes when using tracking mode but fails with subscribing. (yml config)

Upvotes: 0

Views: 222

Answers (1)

Steven
Steven

Reputation: 7275

To be honest, this is expected behavior given the test set up, but it requires some explanation.

Know that the following is the main difference between Subscribing (SEP) and Tracking Event Processors (TEP):

  • SubscribingEventProcessor - Invoked in the thread which published the event on the EventBus, resembling a push mechanism.
  • TrackingEventProcessor - Invoked in a separate thread retrieving events from the EventStore, resembling a pull mechanism.

This ensures the regardless of the approach for concurrent publication of events, the TEP will ensure the event handling order in the case.

When it comes to the SEP, things are slightly different, for which we need to dive into the implementation slightly. You can assume that the publication of two or more events isn't overly strange. Plenty an Aggregate implementation would do this, given the right requirement within the domain. The framework has a way of grouping these transactions of multiple events in one batch. For this, it uses the UnitOfWork. If you would for example enter a command handling function of an Aggregate, you are ensured a UnitOfWork is active to coordinate the life cycle. On of these tasks is to pair up the events in one batch for publication.

In your test case, you are however using the EventGateway directly. Inherently perfectly fine, but the test is set up without starting a UnitOfWork to coordinate these two events as to happen in order. Diving through the code to see how publication to a SEP would work, you would land up in the AbstractEventProcessor at this stage. A validation is done to check whether a UnitOfWork is active when EventProcessor#publish(List<EventMessage>) is invoked. If so, the events are added to the right phase of the UnitOfWork.

When no UnitOfWork (UoW) is active though, the handlers will be invoked immediately.

So, when using a TrackingEventProcessor, it is the framework which will have consciously started a UoW to batch the events in to handle in order. When using a SubscribingEventProcessor, this work is left to the user, making the assumption that users typically will move through the regular flow of [command handling -> event publication -> event handling], which will ensure a UoW is active. Since that is not the case in your integration test, both publish operation will immediately invoke the RaceSaga's SagaManager, creating two instances due to concurrency.

Note that it is recommended to use the TEP for processes like these. Using a SEP for Saga's could mean you will loose some events during a (faulty) shutdown of your application. As SEP is a push mechanism, there is no means to recover from these "lost" (from the perspective of your event processor) events. A TEP will resolve this issue, as it is handling events on it's own accord and keeping track of the process.

Trusting this clarifies things for you @matpiera.

Upvotes: 0

Related Questions