Reputation: 2921
My environment:
kafka_2.10-0.10.0.0
kafka-clients-0.10.0.0
My configurations:
event_notification
20
1
event_cg01
false
As per a requirement, during startup of my app, based on a flag, I have to set the offset either to beginning, or end. For this I am using following code:
final List<PartitionInfo> partitionsInfos = kafkaConsumer.partitionsFor(this.topic);
final List<TopicPartition> assignedPartitions = FluentIterable
.from(partitionsInfos)
.filter(Predicates.notNull())
.transform(new Function<PartitionInfo, TopicPartition>() {
@Override
public TopicPartition apply(final PartitionInfo input) {
return new TopicPartition(topic, input.partition());
}
}).toList();
switch (listenMode) {
case OLDEST:
kafkaConsumer.seekToBeginning(assignedPartitions);
break;
case LATEST:
kafkaConsumer.seekToEnd(assignedPartitions);
break;
default:
break;
}
This code is not working as expected. It hangs forever on seekToBeginning
and seekToEnd
calls.
Am I missing anything?
Upvotes: 3
Views: 2856
Reputation: 163
Before you can seek()
you first need to subscribe()
to a topic or assign()
partition of a topic to the consumer. Please note, that subscribe()
and assign()
are lazy calls therefore, you also need to do a "dummy call" to poll()
before you can use seek() or seekToBeginning()
or seekToEnd()
.
Upvotes: 5