neb
neb

Reputation: 135

How to reset Kafka consumer offset when calling consumer many times

I'm trying to reset consumer offset whenever calling consumer so that when I call consumer many times it can still read record sent by producer. I'm setting props.put("auto.offset.reset","earliest"); and calling consumer.seekToBeginning(consumer.assignment()); but when I call the consumer the second time it will receive no records. How can I fix this?

   public ConsumerRecords<String, byte[]> consumer(){
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    //props.put("group.id", String.valueOf(System.currentTimeMillis()));
    props.put("auto.offset.reset","earliest");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("topiccc"));
    ConsumerRecords<String, byte[]> records = consumer.poll(100);
    consumer.seekToBeginning(consumer.assignment());
   /* List<byte[]> videoContents = new ArrayList<byte[]>();
    for (ConsumerRecord<String, byte[]> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
        videoContents.add(record.value());
    }*/

    return records;
} 

public String producer(@RequestParam("message") String message) {
    Map<String, Object> props = new HashMap<>();
    // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
    Producer<String, byte[]> producer = new KafkaProducer<>(props);
    Path path = Paths.get("C:/Programming Files/video-2012-07-05-02-29-27.mp4");
    ProducerRecord<String, byte[]> record = null;
    try {

        record = new ProducerRecord<>("topiccc", "keyyyyy"
                , Files.readAllBytes(path));

    } catch (IOException e) {
        e.printStackTrace();
    }
    producer.send(record);
    producer.close();
    //kafkaSender.send(record);

    return "Message sent to the Kafka Topic java_in_use_topic Successfully";
}

Upvotes: 3

Views: 4422

Answers (4)

Yatin Goyal
Yatin Goyal

Reputation: 342

When you want one message to be consumed by consumers multiple time the ideal way is to create consumers with different consumer group so same message can be consumed.

But if you want the same consumer to consume the same message multiple time then you can play with commit and offset You set the auto.commit very high or disable it and do commit as per your logic

You can refer to this for more details https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html This javadoc provides detail on how to manually manage offset

Upvotes: 0

Luke Machowski
Luke Machowski

Reputation: 4211

From the Kafka Java Code, the documentation on AUTO_OFFSET_RESET_CONFIG says the following:

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer's group
  • anything else: throw exception to the consumer.
  • This can be found here in GitHub: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

    We can see from their comment that the setting is only used when the offset is not on the server. In the question, the offset is retrieved from the server and that's why the offset is not reset to the beginning but rather stays at the last offset, making it appear that there are no more records.

    You would need to explicitly reset the offset on the server side to fix this as requested in the question.

    Here is another answer that describes how that could be done. https://stackoverflow.com/a/54492802/231860

    This is a snippet of code that allowed me to reset the offset. NOTE: You can't call seekToBeginning if you call the subscribe method. I could only get it to work if I assign the partitions myself using the assign method. Pity.

    // Create the consumer:
    final Consumer<String, DataRecord> consumer = new KafkaConsumer<>(props);
    
    // Get the partitions that exist for this topic:
    List<PartitionInfo> partitions = consumer.partitionsFor(topic);
    
    // Get the topic partition info for these partitions:
    List<TopicPartition> topicPartitions = partitions.stream().map(info -> new TopicPartition(info.topic(), info.partition())).collect(Collectors.toList());
    
    // Assign all the partitions to the topic so that we can seek to the beginning:
    // NOTE: We can't use subscribe if we use assign, but we can't seek to the beginning if we use subscribe.
    consumer.assign(topicPartitions);
    
    // Make sure we seek to the beginning of the partitions:
    consumer.seekToBeginning(topicPartitions);
    

    Yes, it seems extremely complicated to achieve a seemingly rudimentary use case. This might indicate that the whole kafka world just seems to want to read streams once.

    Upvotes: 2

    Anouer Hermassi
    Anouer Hermassi

    Reputation: 151

    There is a workaround for this (not a production solution, though) which is to change the group.id configuration value each time you consume. Setting auto.offset.reset to earliest is not enough in many cases.

    Upvotes: 0

    Lakitu Lakitutu
    Lakitu Lakitutu

    Reputation: 278

    I am usually creating a new consumer with different group.id to read again records. So do it like that:

    props.put("group.id", Instant.now().getEpochSecond());
    

    Upvotes: 0

    Related Questions