Jin Lee
Jin Lee

Reputation: 3512

My Java Consumer can't read messages from Broker even with auto.offset.reset - earliest

I have a topic named `test' in my Broker. I checked it with the CLI.

I created a java producer to send messages to the topic test. I can consume them from my CLI.

.\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning (I am running this on Windows)

However, when I run it in my Java Consumer program, it doesn't consume any messages even though I set the auto.offset.reset to earliest. What am I doing wrong?

public class Consumer1 {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "jin");
        props.put("enable.auto.commit", "true");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        //consumer.subscribe(Collections.singletonList("test"));
        consumer.subscribe(Arrays.asList("test"));

        try {
            while (true) {              
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                //ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records){
                    System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
                }
                //consumer.commitAsync();               
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            consumer.close();
            System.out.println("closed");
        }   
    }
}

Upvotes: 1

Views: 300

Answers (1)

Ryuzaki L
Ryuzaki L

Reputation: 40068

auto.offset.reset This property will if it is a brand new consumer group, or if consumer group offset is deleted. It will not work for consumer group that already has offset stored in Kafka

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

Upvotes: 1

Related Questions