Michal Špondr
Michal Špondr

Reputation: 1535

How to seek to the last offset in Kafka correctly?

I am trying to read Kafka topic from last offset, but I am not able to do it correctly:

In properties file I set these:

group.id=my_group
client.id=my_client
enable.auto.commit=true
auto.offset.reset=earliest
isolation.level=read_committed

Then the code:

consumer = new KafkaConsumer<>(props); // read properties
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1L));
consumerRecords.forEach(consumerRecord -> System.out.println(consumerRecord.offset());

I can see how offset is printed from 0 even if I have some items in topic.

In log file I can see this (shortened):

[Consumer clientId=my_client, groupId=my_group] Finished assignment for group at generation 1: {my_client-f1678be7-ce6b-48e8-acf2-741ab28f7266=Assignment(partitions=[mytopic-0])}

[Consumer clientId=my_client, groupId=my_group] Successfully joined group with generation 1 [Consumer clientId=my_client, groupId=my_group] Notifying assignor about the new Assignment(partitions=[mytopic-0])

[Consumer clientId=my_client, groupId=my_group] Adding newly assigned partitions: mytopic-0

[Consumer clientId=my_client, groupId=my_group] Found no committed offset for partition mytopic-0

[Consumer clientId=my_client,> groupId=my_group] Resetting offset for partition mytopic-0 to offset 0.

Any idea what am I doing wrong? I tried some magic with seekToBeginning() + poll() + commitSync() + seekToEnd() and it somehow worked, but I think this should work by default.

Upvotes: 0

Views: 3333

Answers (1)

divilipir
divilipir

Reputation: 950

auto.offset.reset=latest ll solve ur problem

https://docs.confluent.io/platform/current/clients/consumer.html#offset-management

Upvotes: 1

Related Questions