micgn
micgn

Reputation: 265

Reset consumer offset to the beginning from Kafka Streams

I am using Kafka streams and want to reset some consumer offset from Java to the beginning. KafkaConsumer.seekToBeginning(...) sounds like the right thing to do, but I work with Kafka Streams:

KafkaStreams streams = new KafkaStreams(builder, props);
...
streams.start();

I guess that depending on the concrete streams pipeline I define this would create several consumers under the hood. Can I get access to those? Or is there some other way to reset offsets programmatically?

Upvotes: 3

Views: 4321

Answers (3)

Artur
Artur

Reputation: 1

In Kafka streams there's a trick - you can assign a random value to your APPLICATION_ID_CONFIG if this works for you. Then always you get a new read from the beginning but it's a dirty way.

Upvotes: 0

Frederik Petersen
Frederik Petersen

Reputation: 478

Building on Hans Jespersens answer, I successfully used this code to do what the script does in Java code:

import kafka.tools.StreamsResetter;

StreamsResetter resetter = new StreamsResetter();
String[] args = {"--application-id", APP_ID, "--bootstrap-servers", KAFKA_SERVERS, "--input-topics", TEST_TOPIC_NAME, "--zookeeper", ZOOKEEPER};
resetter.run(args);

The class is part of the kafka core library that I imported in maven using:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>${kafka.version}</version>
    </dependency>

Upvotes: 10

Hans Jespersen
Hans Jespersen

Reputation: 8335

Since you are using Kafka Streams you will want to reset not only the consumer offsets but also the Streams internal state store.

Fortunately there is a Streams Application Reset Tool provided with Kafka.

See https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool

Upvotes: 2

Related Questions