Reputation: 5149
I am trying to read the last message from a Kafka topic but I cannot make it work. I tried different methods that you can find below with their errors or problems Topic description:
$ kafka-topics.sh --bootstrap-server localhost:9092 --topic 52.5_13.4 --describe
Topic: 52.5_13.4 TopicId: VFJtIO-UQBiktUeO5uVQ7w PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: 52.5_13.4 Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
And the topic has multiple messages that I can get via the client.
Setup:
from kafka import KafkaConsumer, TopicPartition
import json
consumer = KafkaConsumer(bootstrap_servers='localhost:9092', auto_offset_reset="earliest", enable_auto_commit=True, value_deserializer=lambda x: json.loads(x.decode("utf-8")), consumer_timeout_ms=600,)
topic = TopicPartition('52.5_13.4', 0)
Method 1
consumer.subscribe('52.5_13.4')
consumer.poll()
consumer.seek_to_end()
Throws
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/.local/lib/python3.10/site-packages/kafka/consumer/group.py", line 865, in seek_to_end
assert partitions, 'No partitions are currently assigned'
AssertionError: No partitions are currently assigned
Method 2
consumer.assign([topic])
consumer.poll()
consumer.seek_to_end()
last_msg = None
for msg in consumer:
print("Inside consumer")
last_msg = msg.value
print("+++")
This prints nothing even the print command before the msg.value
and after the timeout of the consumer, it just gets out of the for
loop.
Method 3
consumer = KafkaConsumer('52.5_13.4', bootstrap_servers='localhost:9092', enable_auto_commit=True, value_deserializer=lambda x: json.loads(x.decode("utf-8")),consumer_timeout_ms=60000,)
topic = TopicPartition('52.5_13.4', 0)
consumer.poll()
consumer.seek_to_end(topic)
for message in consumer:
print(message)
Again prints nothing and after timeout gets out of the loop.
Upvotes: 0
Views: 3024
Reputation: 5149
I couldn't make seek_to_end
work but using the combination of end_offsets
and seek
can do the job:
from kafka import KafkaConsumer, TopicPartition
import json
consumer = KafkaConsumer(bootstrap_servers='localhost:9092', auto_offset_reset="earliest", enable_auto_commit=True, value_deserializer=lambda x: json.loads(x.decode("utf-8")), consumer_timeout_ms=600,)
consumer.subscribe('52.5_13.4')
partition = TopicPartition('52.5_13.4', 0)
end_offset = consumer.end_offsets([partition])
consumer.seek(partition,list(end_offset.values())[0]-1)
for m in consumer:
print(m.value)
Upvotes: 2
Reputation: 191710
In each method, your consumer is going to be sitting at the end of the topic or assigned partitions, waiting for a new record.
You need to seek_to_end, or query for the high watermark of the partitions, then seek()
to that position - 1, and poll
Upvotes: 0