Reputation: 310
When a Kafka consumer of consumer group A connects to the Kafka broker I would like to seek to the end of all partitions, even if an offset is stored on the broker side. If more additional consumers are connecting for the same consumer group they should pickup the latest stored offsets. I am doing the following:
consumer.poll(timeout)
consumer.seekToEnd(emptyList())
while(true) {
val records = consumer.poll(timeout)
if(records.isNotEmpty()) {
//print records
consumer.commitSync()
}
}
The problem is when I connect the first consumer c1 of consumer group A everything works as expected, if I connect an additional consumer c2 of consumer group A, the group is rebalancing and c1 will consume the skipped offsets.
Any ideas?
Upvotes: 3
Views: 4783
Reputation: 7089
You could create a class which implements ConsumerRebalanceListener
, as shown below:
public class AlwaysSeekToEndListener<K, V> implements ConsumerRebalanceListener {
private Consumer<K, V> consumer;
public AlwaysSeekToEndListener(Consumer consumer) {
this.consumer = consumer;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.seekToEnd(partitions);
}
}
Then use this listener when subscribing the topics:
consumer.subscribe(Collections.singletonList("test"), new AlwaysSeekToEndListener<String, String>(consumer));
Upvotes: 7