ViktorZharina
ViktorZharina

Reputation: 11

spring-kafka fixtured event migration implementation

I have several services and one of them is source of truth (SOT). Kafka is a message broker for them. Time to time I need to produce a set of events that will be consumed and applied in other services. So called fixtured event migration.

My fixture file example:

EntityUpdated (topicA)
- id
- relation

RelationUpdated (topicB)
- id
- relation

and classes are spring entity that has projection in database after applying events.

class Entity: Model {
  id
  val relation: Relation
}

class Relation: Model {
  id
}

Current consumer implementation reads topics in arbitrary way and consumer can read data from topicB before topicA and I get case when message can't be applied because related entity does not exist yet. (RelationUpdated consumed before EntityUpdated).

I have several ideas to fix it:

  1. Pause all partitions/topics and resume in specified order. So I can avoid case RelationUpdated consumed before EntityUpdated. Then after resuming all partitions for all topics I can continue to work in arbitrary way. I don't like switching but it looks working.

  2. Put messages that could not be applied to so called dead letter queue and try to replay it again and again until they all will be applied.

Maybe someone do something similar. I am happy to know your ideas.

Upvotes: 0

Views: 112

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121552

This is not what Apache Kafka has been designed for. To be more clear Messaging at all is about independence and separating of concerns. What I mean that messages in one source should not impact other messages and what you would like to do with pausing and DLT is really very impactful on what is the rest is those topic.

I suggest you to take a look into Spring Integration and its aggregator pattern implementation: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator. The idea is: your Entity and Relation have a correlation on that id. So, independently who arrives first, the aggregator will wait for the second part and only after that will release both of them for the next processing step. With this integration solution there won't be need in anything like consumer pausing and no need in extra broker rounds via DLT.

If Spring Integration looks too complicated for you, you can consider to implement your own solution with a local Map to add the based on a first arrival (computeIfAbsent()) and return for a second arrival (computeIfPresent()).

Upvotes: 0

Related Questions