Reputation: 10956
Assuming I have two topics (both with two partitions and infinite retention):
my_topic_a
my_topic_b
and one consumer group:
my_consumer
At some point, it was consuming both topics, but due to some changes, it's no longer interested in my_topic_a
, so it stopped consuming it and now is accumulating lag:
kafka-consumer-groups.sh --bootstrap-server=kafka.core-kafka.svc.cluster.local:9092 --group my_consumer --describe
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my_topic_a 0 300000 400000 100000 - - -
my_topic_a 1 300000 400000 100000 - - -
my_topic_b 0 500000 500000 0 - - -
my_topic_b 1 500000 500000 0 - - -
This lag is annoying me because:
Thus I want to get rid of the offsets for my_topic_a
of my_consumer
, to get to a state as if my_consumer
had never consumed my_topic_a
.
The following attempt fails:
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group my_consumer_group --delete --topic domain.user
With this output:
The consumer does not support topic-specific offset deletion from a consumer group.
How can I achieve my goal? (Temporarily stopping all consumers of this group would be a feasible option in my use-case.)
(I'm using Kafka version 2.2.0
.)
My guess is, something can be done by writing something to topic __consumer_offsets
, but I don't know what it would be. Currently, this topic looks as follows (again, simplified):
kafka-console-consumer.sh --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server kafka:9092 --topic __consumer_offsets --from-beginning
...
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)
Upvotes: 7
Views: 11183
Reputation: 10956
In the meantime (Kafka 2.8) it has become possible with the new --delete-offsets
parameter for kafka-consumer-groups.sh
. :-)
Upvotes: 5
Reputation: 18505
The output you are given:
"The consumer does not support topic-specific offset deletion from a consumer group."
is an indicator that it is not possible to remove a specific topic from a consumer group.
You could change the consumer group for the new application reading only my_topic_b
, restart the application and then remove the old and idle ConsumerGroup completely. With that approach you will be able to track the consumer lags without any distraction and alerts popping up. When restarting the application with a new consumerGroup it is usually best to stop the producer for topic "b" during the restart to make sure you are not missing any messages.
I would really avoid playing around manually with the topic __consumer_offsets
.
As an alternative, you could regularly run a command line tool that comes with Kafka to reduce the lag of your ConsumerGroup:
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group my_consumer --topic my_topic_a --to-latest
You may need to add the --execute
option.
Upvotes: 0