PrabaharanKathiresan
PrabaharanKathiresan

Reputation: 1129

Kafka consumer in group skips the partitions

I've a single consumer which consumes a topic. Topic has 6 partitions. Single consumer assigned to the group. I do poll like below Consumer.poll(10000) I exit the consumer fetch when no records return.

From the documentation I believe poll return empty when no records to consume and duration 10000 is enough to rebalance and fetch records. Most of the times poll consumes records from all partions but some times poll fetched record from 3 partitions and return empty records with out consuming other 3 partitons.

BTW, I used 2.0.1 Kafka client and Kafka server version is 2.11 - 2.2.0.

Any one have idea why my consumer skipping other partitions and return empty records.what should I do to consume all partitions.

Upvotes: 1

Views: 2867

Answers (1)

H.Ç.T
H.Ç.T

Reputation: 3549

max.poll.records parameter is 500 in default. So sometimes it's possible to not be able to get all messages from all partitions in the topic with one poll().

max.poll.records: The maximum number of records returned in a single call to poll().

By the way having just one consumer in group is not appropriate way to consume a topic with partitions. Your number of consumers in consumer group should be equals to number of partitions in topic subscribed in best practice. (Kafka assigns partitions to consumers evenly by default) Otherwise you cannot scale load horizontally, and having partitions is not so meaningful in that case.

Kafka always assigns partitions to consumers. It is not possible to have a partition which is not assigned to a consumer. (If this topic is subscribed)

But in your case because you exit consumer it takes some time (session.timeout.ms) to consider this consumer as dead by Kafka. If you start the consumer again without waiting session.timeout.ms to pass, then Kafka realizes that there is two active consumers in consumer group and assigns partitions evenly to this two consumers. (like: partitions 0, 1, 2 to consumer-1 and partitions 3, 4, 5 to consumer-2) But after Kafka realizes that one of the consumer is dead, rebalance is started in the consumer group and all partitions are assigned to one active consumer in consumer group.

session.timeout.ms: The timeout used to detect client failures when using Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms

You can check current partition assignment for your consumer-group with this cli command in broker side:

./kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group yourConsumerGroup

Upvotes: 1

Related Questions