Reputation: 265
I am using Kafka streams and want to reset some consumer offset from Java to the beginning.
KafkaConsumer.seekToBeginning(...)
sounds like the right thing to do, but I work with Kafka Streams:
KafkaStreams streams = new KafkaStreams(builder, props);
...
streams.start();
I guess that depending on the concrete streams pipeline I define this would create several consumers under the hood. Can I get access to those? Or is there some other way to reset offsets programmatically?
Upvotes: 3
Views: 4321
Reputation: 1
In Kafka streams there's a trick - you can assign a random value to your APPLICATION_ID_CONFIG if this works for you. Then always you get a new read from the beginning but it's a dirty way.
Upvotes: 0
Reputation: 478
Building on Hans Jespersens answer, I successfully used this code to do what the script does in Java code:
import kafka.tools.StreamsResetter;
StreamsResetter resetter = new StreamsResetter();
String[] args = {"--application-id", APP_ID, "--bootstrap-servers", KAFKA_SERVERS, "--input-topics", TEST_TOPIC_NAME, "--zookeeper", ZOOKEEPER};
resetter.run(args);
The class is part of the kafka core library that I imported in maven using:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
</dependency>
Upvotes: 10
Reputation: 8335
Since you are using Kafka Streams you will want to reset not only the consumer offsets but also the Streams internal state store.
Fortunately there is a Streams Application Reset Tool provided with Kafka.
See https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
Upvotes: 2