Reputation: 95
I am using consumer from https://github.com/confluentinc/confluent-kafka-go. The kafka version is 0.10.1.0.
Here is the configuration of my consumer:
kafkaClient, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"group.id": "udwg20",
"session.timeout.ms": 60000,
"go.events.channel.enable": true,
"go.application.rebalance.enable": true,
"default.topic.config": kafka.ConfigMap{
"auto.offset.reset": "earliest",
"enable.auto.commit": true,
"auto.commit.interval.ms": 10000}})
At the beginning, all current-offset and lag was showed, but after running several hours, the offset and lag of some partitions (which are not received any new message) become unknown. If there is a message come to a partition which its offset and lag are unknown, the offset and lag will be visible again, and the message will be consumed.
When there are some partitions with unknown current-offset and lag, I restart the consumer, at this time, all current partitions with unknown current-offset and lag status will be consumed again from beginning, but other partitions seem to run normally.
I also used a python consumer that consumes message from this topic with a different consumer group id. the python consumer seem to work well without any partition which has unknown current-offset and lag.
Upvotes: 2
Views: 4494
Reputation: 95
I use the command bellow to see whether the offset of my consumer group id is commited periodically or not.
echo exclude.internal.topics=false > consumer.properties
kafka-console-consumer --consumer.config consumer.properties --from-beginning --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
Although I set the value of enable.auto.commit
to true
, it does not commit periodically for partitions which its lag = 0
. The current offset for those partitions is removed after 2 to 3 hours even the consumer group is still active.
To solve this issue, I set enable.auto.commit
to false
and write my own function to commit offset after every 5 seconds.
Here is the ideal:
When a consumer get a new Message
event or reach end of partition (PartitionEOF
) event, from the data of the event, I keep the latest current offset in a commit map (key:topic_partition
value:
kafka.TopicPartition{ Topic, Partition, Offset }
) and there is a function to commit this map periodically (may be after every 5 seconds). When the consumer get RevokedPartitions
event, I remove the corresponding key topic_partition
from the commit map.
Upvotes: 0
Reputation: 39800
offsets.retention.minutes
is used for clean up inactive consumer groups. If a consumer group does not commit any offset for offsets.retention.minutes
(defaults to 24h), kafka will clean up its offset. This is why offset and log are set to unknown
.
You can increase offset retention period however, be aware that old consumers will reserve space in __consumer_offsets
topic.
Upvotes: 1