Reputation: 6452
TLDR: I would like to be able to process some urgent messages before others in the same topic.
More explanation :
I'm working on a fairly large application using many spring microservices relying on spring-cloud-stream with the kafka-binder to talk between each other for some tasks.
It can happen that a scheduled task produces lot of events. When these event are consumed intermediary events may be produced and send, and it can take quite some time till all the initial and intermediary messages are consumed and all the work is done.
During this time, if someone wants to manually send an event of the same type, it will be queued and processed only when everything else is done.
I've been asked to see if I can not make these "manual events" be processed in priority, before all the other events.
Usually in this case you see people advocating for duplicating all the topics (topic-normal
and topic-urgent
for example), duplicating the @StreamListener
. This seems a rather complicated option, because we have lots of different topics involved here. We also have some kafka-streams aggregation that would need to be rewritten to take this into account.
With a fellow developer we were thinking of maybe duplicating the consumer groups (consumerGroupNormal
, consumerGroupUrgentusing
) for each topic were urgent messages are needed. I would then add a priority header to each message, duplicate the StreamListener
and consume only messages with priority: urgent
headers in one of the listener, and only messages with priority:normal
in the other listener.
I would also probably create a ChannelInterceptorAdapter to add and read this "priority" header automatically (a bit like Sleuth does it).
But I still would have to add lots of configuration for the binders to write and would need to duplicate the listeners, add the little code that checks the header and executes or not the listener code.
All in all, this looks like quite a bit of work for somehting that looks like quite a common and simple use case that could be useful for other people so I was wondering if there is not a simpler way to do it ?
Upvotes: 0
Views: 720
Reputation: 174584
Using two consumer groups on each topic is pretty much the only way to do it and have two listeners.
You don't however, need a custom channel interceptor; you can use the condition
property something like "headers['foo'].equals('urgent')"
.
Upvotes: 1