Reputation: 3512
I have a topic named `test' in my Broker. I checked it with the CLI.
I created a java producer to send messages to the topic test
. I can consume them from my CLI.
.\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
(I am running this on Windows)
However, when I run it in my Java Consumer program, it doesn't consume any messages even though I set the auto.offset.reset
to earliest
. What am I doing wrong?
public class Consumer1 {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "jin");
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//consumer.subscribe(Collections.singletonList("test"));
consumer.subscribe(Arrays.asList("test"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
//ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
//consumer.commitAsync();
}
} catch (Exception e){
e.printStackTrace();
} finally {
consumer.close();
System.out.println("closed");
}
}
}
Upvotes: 1
Views: 300
Reputation: 40068
auto.offset.reset This property will if it is a brand new consumer group, or if consumer group offset is deleted. It will not work for consumer group that already has offset stored in Kafka
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
Upvotes: 1