Martin
Martin

Reputation: 33

How to replay/project events in Axon4 towards a different context?

I'm building my first event sourced system. It will have multiple domains using projects with a publication lifecycle at it's core. How can I effectively replay or re-apply events of two domains to a new aggregate inside a third domain?

To be more specific. Imagine 4 domains each with their own bounded context and purpose. A short description of these contexts:

The states of publication follow the lifecycle: concept (not yet published) > announced (optional) > sale > sold-out (publication ended). In my description I focus on the announced status. Concept is not actually a thin for the publication domain since a project is always in concept if publication does not know about it yet.


My first attempt was setting up a normal aggregate which handled the incoming event AnnouncementPublishedEvent. This requires a project to meet some basic requirements like 'it has a name', 'it has a description', 'it has at least one image' and so on. This means I need to validate this information before the event is applied and therefore I somehow need to supply a project instance in the command.

While doing this I suspected this method breaks the purpose of CQRS and I should be looking at the real data source: events. My next attempt was creating a Saga that starts when the event AnnouncementPublicationRequestedEvent. This saga needs to review which events occured around the given projectId and apply those to this new 'published project' projection in order to (at least) validate if the request can be accepted.

I researched and experimented with tracking processors but could not get a good example how this is done in version 4 of Axon. I also started to read several other questions on Stackoverflow that made me think I might need to reconsider my approach.


Unfortunately, the exact code can not be shared as it's not open source and even if I could it's far from a working state. I can use example code to show what I'm trying to do.

@Saga
@ProcessingGroup("AnnouncementPublication")
public class AnnouncementPublicationSaga {

   private static int NUMBER_OF_ALLOWED_IMAGES

   private PublicationId publicationId;
   private ProjectId projectId;
   private int numberOfImages = 0;
   //...other fields

   @StartSaga
   @SagaEventHandler(associationProperty = "projectId")
   public void handle(AnnouncementPublicationRequestedEvent event) {
      publicationId = generatePublicationId();

      //set parameters from event for saga to use
      projectId = event.getProjectId();
      targetPublicationStatus = event.getPublicationStatus();
      date = event.getDate();

      //initialize the 'publicated project' aggregate
      //start a replay of associated events for this @ProcessingGroup
   }

   ...

   @SagaEventHandler(associationProperty = "projectId")
   public void handle(ProjectCreatedEvent event) {
      //Verify the project exists and has a valid name
   }

   ...

   /* Assumption* on how AssociationResolver works: */
   @SagaEventHandler(AssociationResolver=MediaProjectAssociator.class )
   public void handle(ProjectImageAdded event) {
      numberOfImages += 1;
   }

   /* Assumption* on how AssociationResolver works: */
   @SagaEventHandler(AssociationResolver=MediaProjectAssociator.class )
   public void handle(ProjectImageRemoved event) {
      numberOfImages -= 1;
   }

   ...

   /* In my head this should trigger if all events have been played
      up to the PublicationRequestedEvent. Or maybe 
   */
   @SagaEventHandler(associationProperty = "publicationId")
   public void handle(ValidationRequestCompleted event) {
      //ValidationResult result = ValidationResult.builder();
      ...
      if (numberOfImages > NUMBER_OF_ALLOWED_IMAGES) {
         //reason to trigger PublicationRequestDeniedEvent
         //update validationResult
      }
      ...
      if (validationResult.isAcceptable()) {
         //Trigger AnnouncementPublicationAcceptedEvent
      } else {
         //Trigger AnnouncementPublicationDeniedEvent
      }
   }

   ...

   @EndSaga
   @SagaEventHandler(associationProperty = "publicationId")
   public void handle(AnnouncementPublicationDeniedEvent event) {
      //do stuff to inform why the publication failed
   }

   @EndSaga
   @SagaEventHandler(associationProperty = "publicationId")
   public void handle(AnnouncementPublicationAcceptedEvent event){
      //do stuff to notify success to user
      //choice: delegate to delivery for actual sharing of data
      //    or  delivery itselfs listens for these events
   }
}

*The associationResolver code is an assumption to it's actual working as I'm not even close to that part yet. My media context uses a file id as aggregate identifier as not every event is bound to a project. But all the media events this saga needs to replay will have a projectId as field in them. Any feedback on this is welcome but it's not my main problem now.


In the end the result should be: a record of the publication or a record of the attempt and why it failed.

The record of the publication contains all data from project or media events that are relevant to a publication. This is mostly information that potential buyers need to make a decision.

For the purpose of this question I don't expect the above to be solved completely. I just want to know if I'm on the right track with thinking in events, if my approach on replaying relevant events is the right way to go and if so how this can be done in Axon4.

Upvotes: 3

Views: 606

Answers (1)

Steven
Steven

Reputation: 7275

From your problem description Martin, I assume you have several distinct Bounded Contexts. Following the definition of Bounded Context:

Explicitly define the context within which a model applies.

Explicitly set boundaries in terms of team organization, usage within specific parts of the application, and physical manifestations such as code bases and database schemas.

Keep the model strictly consistent within these bounds, but don’t be distracted or confused by issues outside.

From this I'd like to emphasize that within a given Bounded Context, you speak the same language/API with any component. Between contexts, you will however share very consciously, using dedicated context-mappings like for example an anti-corruption layer to ensure another domain doesn't enter your domain.

Having said the above, events are part of a specific Bounded Context. Thus, using multiple streams of events from other contexts to recreate/replay an aggregate in another context should ideally be out of the question.

On top of this, in Axon an Aggregate can only ever be recreated based on events it has published itself.

To still arrive to a solution where a given application ingests events from other applications to re-hydrate an Aggregate, I would take the following steps:

  • Have a dedicated component (e.g. the anti-corruption layer) which translates the incoming events into a different form of message within your application.
  • If these events should result in the reconstruction of an Aggregate, you are required to make translate the events to commands. The Aggregate infrastructure components in Axon are meant for the Command Model when talking about CQRS.
  • Said Aggregate would then handle the commands, perform some business logic and publish an event (or several) as a result.
  • From here on out, the Framework will deal with replaying all events for the given Aggregate, granted you follow Event Sourcing practices to update the Aggregate's state.

Lastly, I'd like to point out that any specifics provided by Axon around replaying tied to the TrackingEventProcessor are meant for Event Processing on the Query side of a CQRS application.

Hope this clarifies things for you Martin! If not, feel free to comment under this answer and I'll update my response accordingly.

Upvotes: 1

Related Questions