Saloni Vithalani
Saloni Vithalani

Reputation: 323

How to always consume from latest offset in kafka-streams

Our requirement is such that if a kafka-stream app is consuming a partition, it should start it's consumption from latest offset of that partition.

This seems like do-able using

streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

Now, let's say using above configuration, the kafka-stream app started consuming data from latest offset for a partition. And after some time, the app crashes. When the app comes back live, we want it to consume data from the latest offset of that partition, instead of the where it left last reading.

But I can't find anything that can help achieve it using kafka-streams api.

P.S. We are using kafka-1.0.0.

Upvotes: 9

Views: 14263

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

That is not supported out-of-box.

Configuration auto.offset.reset only triggers, if there are no committed offsets and there is no config to change this behavior.

You could manipulate offsets manually before startup using bin/kafka-consumer-groups.sh though—the application.id is the group.id and and you could "seek to end" before you restart the application.

Update:

Since 1.1.0 release, you can use bin/kafka-streams-application-reset.sh tool to set starting offsets. To use the tool, the application must be offline. (cf: https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application)

Upvotes: 10

Related Questions