miran
miran

Reputation: 1519

Kafka Consumer - topic(s) with higher priority

I am using Kafka Consumer to read from several topics and I need one of those to have higher priority. The processing takes a lot of time and there are always many messages in (low priority) topics, but I need the messages from other one to be processed as soon as possible.

It's similar question as Does Kafka support priority for topic or message? but this one is using old API.

In new API (0.10.1.1), there are methods

KafkaConsumer::pause(Collection)
KafkaConsumer::resume(Collection)

But it's not clear to me, how to effectively detect that there are new messages in high priority topic and it is necessary to pause consumption from the other topics.

Any ideas/examples?

Upvotes: 3

Views: 6751

Answers (2)

miran
miran

Reputation: 1519

Finally I solved that, as dawsaw advised - in processing loop, I store for all topics/partitions I read from:

  • beginningOffsets
  • endOffsets
  • committed - I can't use position, since I subscribe to topics, not to partitions.

Whenever (endOffset - commited) > 0 for any priority topic, I call consumer.pause() for non priority topics and resume those again after (endOffset - commited) == 0 for all priority topics.

Upvotes: 7

ppatierno
ppatierno

Reputation: 10065

I guess that you could you a mix of position() and committed() methods. The position() method gets the offset of the next record that will be fetched and the committed() method gets the last committed offset for the given partition (as described in the documentation). Before polling on the lower priority you could check the position() and committed() for the higher priority. If position() is higher than committed() you could pause() the lower priority and poll() on the higher priority(), then resuming the lower priority.

Upvotes: 3

Related Questions