Tobias Hermann
Tobias Hermann

Reputation: 10956

How to delete the consumer offset of a group for one specific topic

Assuming I have two topics (both with two partitions and infinite retention):

and one consumer group:

At some point, it was consuming both topics, but due to some changes, it's no longer interested in my_topic_a, so it stopped consuming it and now is accumulating lag:

kafka-consumer-groups.sh --bootstrap-server=kafka.core-kafka.svc.cluster.local:9092 --group my_consumer --describe
TOPIC                                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                  HOST            CLIENT-ID
my_topic_a                           0          300000          400000          100000          -                                                            -               -
my_topic_a                           1          300000          400000          100000          -                                                            -               -
my_topic_b                           0          500000          500000          0               -                                                            -               -
my_topic_b                           1          500000          500000          0               -                                                            -               -

This lag is annoying me because:

Thus I want to get rid of the offsets for my_topic_a of my_consumer, to get to a state as if my_consumer had never consumed my_topic_a.

The following attempt fails:

kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group my_consumer_group --delete --topic domain.user

With this output:

The consumer does not support topic-specific offset deletion from a consumer group.

How can I achieve my goal? (Temporarily stopping all consumers of this group would be a feasible option in my use-case.)

(I'm using Kafka version 2.2.0.)


My guess is, something can be done by writing something to topic __consumer_offsets, but I don't know what it would be. Currently, this topic looks as follows (again, simplified):

kafka-console-consumer.sh --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server kafka:9092 --topic __consumer_offsets --from-beginning
...
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)

Upvotes: 7

Views: 11183

Answers (2)

Tobias Hermann
Tobias Hermann

Reputation: 10956

In the meantime (Kafka 2.8) it has become possible with the new --delete-offsets parameter for kafka-consumer-groups.sh. :-)

Upvotes: 5

Michael Heil
Michael Heil

Reputation: 18505

The output you are given:

"The consumer does not support topic-specific offset deletion from a consumer group."

is an indicator that it is not possible to remove a specific topic from a consumer group.

You could change the consumer group for the new application reading only my_topic_b, restart the application and then remove the old and idle ConsumerGroup completely. With that approach you will be able to track the consumer lags without any distraction and alerts popping up. When restarting the application with a new consumerGroup it is usually best to stop the producer for topic "b" during the restart to make sure you are not missing any messages.

I would really avoid playing around manually with the topic __consumer_offsets.

As an alternative, you could regularly run a command line tool that comes with Kafka to reduce the lag of your ConsumerGroup:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group my_consumer --topic my_topic_a --to-latest 

You may need to add the --execute option.

Upvotes: 0

Related Questions