Reputation: 593
I have a kafka topic with 2 partitions, I want to create a consumer to read topic from a given offset, below is the sample code that I am using to read from given offset which is 9
Properties configProperties = new Properties();
configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_consumer_group");
configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
KafkaConsumer kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for(TopicPartition topicPartition: partitions) {
consumer.seek(topicPartition, 9);
}
}
});
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic() + ", " + record.partition() + ", " + record.offset() + ", " + record.value());
}
}
}catch(WakeupException ex){
System.out.println("Exception caught " + ex.getMessage());
}finally{
kafkaConsumer.close();
}
But i am seeing the following error
org.apache.kafka.clients.consumer.internals.Fetcher - Fetch offset 9 is out of range for parition test-topic_partitions-0, resetting offset
org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition test-topic_partitions-0 to offset
I am using below maven dependency
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
And also to mention the delete.retention.ms
configured for this topic is 86400000 (which is 1 day) and retention.ms
is configured as 172800000 (which is 2 days)
Can someone help how to resolve the error ?
Upvotes: 1
Views: 666
Reputation: 26950
This error means that the partition does not have a record at offset 9. So either:
You can use endOffsets()
and beginningOffsets()
to find the smallest and largest offsets in your partition. If you try to seek to an offset out of this range, the reset policy auto.offset.reset
triggers to find a valid offset.
Upvotes: 1