Reputation: 53
I'm using Kafka-Python to read from a topic from a kafka broker but I can't seem to get the consumer iterator to return anything
consumer = KafkaConsumer("topic",bootstrap_servers=bootstrap_server + ":" + str(port), group_id="mygroup")
for record in consumer:
print(record)
It seems like it's just hanging. I've verified that the topic exists and has data on the broker and that new data is being produced. When I change the call to the KafkaConsumer
constructor, and add auto_offset_reset="earliest"
, everything works as expected and the consumer iterator returns records. The default value for this param is "latest"
, but with that value I can't seem to consume data.
Why would this be the case?
Upvotes: 1
Views: 2251
Reputation: 39790
You also need to include auto_offset_reset='smallest'
when instantiating KafkaConsumer
which is equivalent to --from-beginning
for the command line tool kafka-console-consumer.sh
i.e.
consumer = KafkaConsumer("topic",bootstrap_servers=bootstrap_server + ":" + str(port), group_id="mygroup", auto_offset_reset='smallest')
The reason why you might see no data being consumed is probably because when your consumer is up and running no data is produced from the producer's side. Therefore, you need to indicate that you want to consume all the data in the topic (even if no data is being inserted at the moment).
According to the official documentation:
The Kafka consumer works by issuing "fetch" requests to the brokers leading the partitions it wants to consume. The consumer specifies its offset in the log with each request and receives back a chunk of log beginning from that position. The consumer thus has significant control over this position and can rewind it to re-consume data if need be.
Upvotes: 1