Niranjan
Niranjan

Reputation: 2921

Why calls to seekToBeginning and seekToEnd APIs of Kafka hang forever?

My environment:

My configurations:

As per a requirement, during startup of my app, based on a flag, I have to set the offset either to beginning, or end. For this I am using following code:

final List<PartitionInfo> partitionsInfos = kafkaConsumer.partitionsFor(this.topic);

final List<TopicPartition> assignedPartitions = FluentIterable
      .from(partitionsInfos)                                                             
      .filter(Predicates.notNull())
      .transform(new Function<PartitionInfo, TopicPartition>() {                                                           
          @Override
          public TopicPartition apply(final PartitionInfo input) {                                                                         
            return new TopicPartition(topic, input.partition());
          }
      }).toList();

switch (listenMode) {
case OLDEST:
  kafkaConsumer.seekToBeginning(assignedPartitions);
  break;
case LATEST:
  kafkaConsumer.seekToEnd(assignedPartitions);
  break;
default:
  break;
}

This code is not working as expected. It hangs forever on seekToBeginning and seekToEnd calls.

Am I missing anything?

Upvotes: 3

Views: 2856

Answers (1)

Balkrushn Viroja
Balkrushn Viroja

Reputation: 163

Before you can seek() you first need to subscribe() to a topic or assign() partition of a topic to the consumer. Please note, that subscribe() and assign() are lazy calls therefore, you also need to do a "dummy call" to poll() before you can use seek() or seekToBeginning() or seekToEnd().

Upvotes: 5

Related Questions