clausmc
clausmc

Reputation: 310

seekToEnd of all partitions and survive automatic rebalancing of Kafka consumers

When a Kafka consumer of consumer group A connects to the Kafka broker I would like to seek to the end of all partitions, even if an offset is stored on the broker side. If more additional consumers are connecting for the same consumer group they should pickup the latest stored offsets. I am doing the following:

consumer.poll(timeout) 
consumer.seekToEnd(emptyList())

while(true) {
  val records = consumer.poll(timeout)
  if(records.isNotEmpty()) {
    //print records
    consumer.commitSync()
  }
}

The problem is when I connect the first consumer c1 of consumer group A everything works as expected, if I connect an additional consumer c2 of consumer group A, the group is rebalancing and c1 will consume the skipped offsets.

Any ideas?

Upvotes: 3

Views: 4783

Answers (1)

amethystic
amethystic

Reputation: 7089

You could create a class which implements ConsumerRebalanceListener, as shown below:

public class AlwaysSeekToEndListener<K, V> implements ConsumerRebalanceListener {

    private Consumer<K, V> consumer;

    public AlwaysSeekToEndListener(Consumer consumer) {
        this.consumer = consumer;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        consumer.seekToEnd(partitions);
    }
}

Then use this listener when subscribing the topics:

consumer.subscribe(Collections.singletonList("test"), new AlwaysSeekToEndListener<String, String>(consumer));

Upvotes: 7

Related Questions