Reputation: 21
Elasticsearch pipeline set up with Kafka cluster between 2 logstash instances. I need to reset the offset back 12 hours for a topic and start the consumer again.
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kfkserver:9092 --topic topicname --time 1488153601000
which returns topicname:0:3730858
1488153601000 <- 2017-02-27 00:00:01 in milliseconds
How can I set the offset time?
Upvotes: 1
Views: 1553
Reputation: 206
If you're on 0.10.x and don't have the awesome offset management tool that was added in 0.11, there's a hack to use kafka-console-consumer.sh to change a consumer group's offset. This only works with the numeric offset though, not the timestamp.
First, stop whatever process is running that's using that consumer. Clean shutdown is best. Then, run a command that looks like this:
bin/kafka-console-consumer.sh --bootstrap-server :9092 \
--topic my-topic \
--partition 1 \
--consumer-property group.id=my-consumer-group \
--max-messages 0 \
--offset 12345
--max-messages 0
is important here; setting it to any other value, including 1, will consume that many messages and then commit the current latest offset in that topic/partition. This must be a bug in the console consumer.
Next, check your work with kafka-consumer-groups.sh:
./kafka-consumer-groups.sh --bootstrap-server :9092 \
--group my-consumer-group \
--describe
Upvotes: 3