Ehsan
Ehsan

Reputation: 318

Kafka consumer does not start from latest message

I want to have a Kafka Consumer which starts from the latest message in a topic.

Here is the java code:

private static Properties properties = new Properties();
private static KafkaConsumer<String, String> consumer;
static
{
    properties.setProperty("bootstrap.servers","localhost");
    properties.setProperty("enable.auto.commit", "true");
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("group.id", "test");
    properties.setProperty("auto.offset.reset", "latest");
    consumer = new KafkaConsumer<>(properties);

    consumer.subscribe(Collections.singletonList("mytopic"));
}

@Override
public StreamHandler call() throws Exception
{
    while (true) 
    {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(200);
        Iterable<ConsumerRecord<String, String>> records = consumerRecords.records("mytopic");
        for(ConsumerRecord<String, String> rec : records)
        {
            System.out.println(rec.value());
        }
    }
}

Although the value for auto.offset.reset is latest, but the consumer starts form messages which belong to 2 days ago and then it catches up with the latest messages.

What am I missing?

Upvotes: 7

Views: 12849

Answers (2)

Brian Ecker
Brian Ecker

Reputation: 2087

Have you run this same code before with the same group.id? The auto.offset.reset parameter is only used if there is not an existing offset already stored for your consumer. So if you've run the example previously, say two days ago, and then you run it again, it will start from the last consumed position.

Use seekToEnd() if you would like to manually go to the end of the topic.

See https://stackoverflow.com/a/32392174/1392894 for a slightly more thorough discussion of this.

Upvotes: 14

Hans Jespersen
Hans Jespersen

Reputation: 8335

If you want to manually control the position of your offsets you need to set enable.auto.commit = false.

If you want to position all offsets to the end of each partition then call seekToEnd()

https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToEnd(java.util.Collection)

Upvotes: 2

Related Questions