Reputation: 1519
I am using Kafka Consumer to read from several topics and I need one of those to have higher priority. The processing takes a lot of time and there are always many messages in (low priority) topics, but I need the messages from other one to be processed as soon as possible.
It's similar question as Does Kafka support priority for topic or message? but this one is using old API.
In new API (0.10.1.1), there are methods
KafkaConsumer::pause(Collection)
KafkaConsumer::resume(Collection)
But it's not clear to me, how to effectively detect that there are new messages in high priority topic and it is necessary to pause consumption from the other topics.
Any ideas/examples?
Upvotes: 3
Views: 6751
Reputation: 1519
Finally I solved that, as dawsaw advised - in processing loop, I store for all topics/partitions I read from:
Whenever (endOffset - commited) > 0
for any priority topic, I call consumer.pause()
for non priority topics and resume those again after (endOffset - commited) == 0
for all priority topics.
Upvotes: 7
Reputation: 10065
I guess that you could you a mix of position() and committed() methods. The position() method gets the offset of the next record that will be fetched and the committed() method gets the last committed offset for the given partition (as described in the documentation). Before polling on the lower priority you could check the position() and committed() for the higher priority. If position() is higher than committed() you could pause() the lower priority and poll() on the higher priority(), then resuming the lower priority.
Upvotes: 3