BrianBola
BrianBola

Reputation: 21

Kafka 0.10.1.0 change offset time

Elasticsearch pipeline set up with Kafka cluster between 2 logstash instances. I need to reset the offset back 12 hours for a topic and start the consumer again.

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list  kfkserver:9092 --topic topicname --time 1488153601000

which returns topicname:0:3730858

1488153601000 <- 2017-02-27 00:00:01 in milliseconds

How can I set the offset time?

Upvotes: 1

Views: 1553

Answers (1)

Mike Fischer
Mike Fischer

Reputation: 206

If you're on 0.10.x and don't have the awesome offset management tool that was added in 0.11, there's a hack to use kafka-console-consumer.sh to change a consumer group's offset. This only works with the numeric offset though, not the timestamp.

First, stop whatever process is running that's using that consumer. Clean shutdown is best. Then, run a command that looks like this:

bin/kafka-console-consumer.sh --bootstrap-server :9092 \
    --topic my-topic \
    --partition 1 \
    --consumer-property group.id=my-consumer-group \
    --max-messages 0 \
    --offset 12345

--max-messages 0 is important here; setting it to any other value, including 1, will consume that many messages and then commit the current latest offset in that topic/partition. This must be a bug in the console consumer.

Next, check your work with kafka-consumer-groups.sh:

./kafka-consumer-groups.sh --bootstrap-server :9092 \
    --group my-consumer-group \
    --describe

Upvotes: 3

Related Questions