Reputation: 1645
I found this where I reset my LAG with the kafka-consumer-groups.sh tool How to change start offset for topic? but I am needing to reset it within the application. I found this example, but it doesn't seem to reset it. kafka-python read from last produced message after a consumer restart example
consumer = KafkaConsumer("MyTopic", bootstrap_servers=self.kafka_server + ":" + str(self.kafka_port),
enable_auto_commit=False,
group_id="MyTopic.group")
consumer.poll()
consumer.seek_to_end()
consumer.commit()
... continue on with other code...
Running bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --group MyTopic.group --describe
still shows that both partitions have a LAG. How can I get the current-offset to "fast-foward" to the end?
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
MyTopic 0 52110 66195 14085 kafka-python-1.4.2-6afb6901-c651-4534-a482-15358db42c22 /Host1 kafka-python-1.4.2
MyTopic 1 52297 66565 14268 kafka-python-1.4.2-c70e0a71-7d61-46a1-97bc-aa2726a8109b /Host2 kafka-python-1.4.2
Upvotes: 5
Views: 16152
Reputation: 4156
You may want this:
def consumer_from_offset(topic, group_id, offset):
"""return the consumer from a certain offset"""
consumer = KafkaConsumer(bootstrap_servers=broker_list, group_id=group_id)
tp = TopicPartition(topic=topic, partition=0)
consumer.assign([tp])
consumer.seek(tp, offset)
return consumer
consumer = consumer_from_offset('topic', 'group', 0)
for msg in consumer:
# it will consume the msg beginning from offset 0
print(msg)
Upvotes: 4
Reputation: 1459
In order to "fast forward" the offset of consumer group, means to clear the LAG, you need to create new consumer that will join the same group.
the console command for that is:
kafka-console-consumer.sh --bootstrap-server <brokerIP>:9092 --topic <topicName> --consumer-property group.id=<groupName>
In parallel you can run the command to see the lags like you described, and you will see the lag wiped.
Upvotes: 3