neManiac
neManiac

Reputation: 355

How to do async integration event to domain event translation in MartenDb

So I have integration events from outside my system that I consume with competing consumers and store in event streams with MartenDB.

I am trying to grab these events in batches and grab the current aggregate and generate domain events which I would put in a different stream. I would then like to take those domain events and asynchronously project them one by one to generate the aggregation and save it.

Why am I trying to do this?

I want the domain events to be clean.

That means fixing ordering issues from competing consumers, throwing away information I don't need from the integration events, combining multiple integration events which should represent an atomic operation into one domain event, and so on.

For the out of order events (meaning changes with different orderings or event which creates the aggregate comes after the event which updates it) really the only way I see is a sort of buffering which comes from the async daemon plus batching plus retries if need be.

But this seems to be difficult to pull off with custom projections.

I start/append events to event streams in IDocumentOperations Event field but there's no save changes to call, and MartenDb seems to only save changes to documents but not to streams.

At this point I'm looking at writting my own sql or using CDC to trigger this second stage and then using Marten for the domain event to domain aggregate projection (stage 3). Maybe I can somehow "skip" integration events for which know that they are out of order and leave a record of this decision somewhere and then come back to them later? Seems very hacky to me.

That is telling me that either I am missing something in how to use MartenDB effectively to accomplish this task or I am trying to do something which I shouldn't.

So my question is: which is it and what should I do on this situation?

Upvotes: 0

Views: 24

Answers (0)

Related Questions