nagendra
nagendra

Reputation: 593

Kafka single consumer to read from topic having multiple partitions from given offset

I have a kafka topic with 2 partitions, I want to create a consumer to read topic from a given offset, below is the sample code that I am using to read from given offset which is 9

Properties configProperties = new Properties();
configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_consumer_group");
configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

KafkaConsumer kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

    }
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        for(TopicPartition topicPartition: partitions) {
            consumer.seek(topicPartition, 9);
        }
    }
});

try {
    while (true) {
        ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.topic() + ", " + record.partition() + ", " + record.offset() + ", " + record.value());
        }
    }
}catch(WakeupException ex){
    System.out.println("Exception caught " + ex.getMessage());
}finally{
    kafkaConsumer.close();
}

But i am seeing the following error

org.apache.kafka.clients.consumer.internals.Fetcher -  Fetch offset 9 is out of range for parition test-topic_partitions-0, resetting offset
org.apache.kafka.clients.consumer.internals.Fetcher -  Resetting offset for partition test-topic_partitions-0 to offset 

I am using below maven dependency

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.0</version>
</dependency>

And also to mention the delete.retention.ms configured for this topic is 86400000 (which is 1 day) and retention.ms is configured as 172800000 (which is 2 days)

Can someone help how to resolve the error ?

Upvotes: 1

Views: 666

Answers (1)

Mickael Maison
Mickael Maison

Reputation: 26950

This error means that the partition does not have a record at offset 9. So either:

  • you have currently less than 9 records in the partition
  • records at least up to 9 have been deleted by the retention policy

You can use endOffsets() and beginningOffsets() to find the smallest and largest offsets in your partition. If you try to seek to an offset out of this range, the reset policy auto.offset.reset triggers to find a valid offset.

Upvotes: 1

Related Questions