Reputation: 847
I have following program to consume all the messages coming to Kafka.
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_test_topic',
group_id='my-group',
bootstrap_servers=['my_kafka:9092'])
for message in consumer:
consumer.commit()
print ("%s key=%s value=%s" % (message.topic,message.key,
message.value))
consumer.close()
Using above program i am able to consume all the messages coming to Kafka. But once all messages are consumed, i want to close the kafka consumer which is not happening. I need help in same.
Upvotes: 12
Views: 22067
Reputation: 847
I am able to close kafka consumer now if i provide consumer_timeout_ms argument to KafkaConsumer object. It accepts timeout value in millisecond. Below is the code snippet.
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_test_topic',
group_id='my-group',
bootstrap_servers=['my_kafka:9092'],
consumer_timeout_ms=1000)
for message in consumer:
consumer.commit()
print ("%s key=%s value=%s" % (message.topic,message.key,
message.value))
consumer.close()
In above code if consumer doesn't see any message for 1 second it will close the session.
Upvotes: 21
Reputation: 320
I think the accepted answer here is not exactly accurate, so here is my take on this:
You can just add a condition and if it is met you can break the for loop:
for message in consumer:
if condition:
break
In your case, you want to stop when all messages are consumed, so you have to find a way to tell the consumer that all messages have arrived.
For example, you could produce a message which could have that information in it, and then your condition would be checking if the message consumed is the one reporting all messages have arrived.
Another example that was mentioned here before is just assuming that if no message arrives for a certain amount of time (1 second was suggested here, but maybe a few more seconds at least might be better) that means that there are no more messages coming.
The way I did it was by checking if all IDs that I had received are accounted for at least once (to avoid duplicates) but that requires you know exactly what you're receiving and some more logic that is probably beyond the scope of this question, but I found it to be a very useful and elegant way to determine how to stop consuming, here is some of the code you would need for that:
sum = 0
data = {
0: None,
1: None,
2: None,
3: None
}
for message in consumer:
payload = message.value
unique_id = payload["unique_id"]
if data[unique_id] is None:
data[unique_id] = payload
sum += 1
if len(data) == sum:
break
a much easier way if you know how many messages you will be consuming is to use enumerate like this:
amount_of_messages_to_be_consumed = 40 # as an example 40
for index, message in enumerate(consumer):
if index == amount_of_messages_to_be_consumed:
break
of course, after you break out of the for loop you can and should close the consumer (but you were probably just stuck on getting out of the endless for loop...):
consumer.close()
Upvotes: 0
Reputation: 173
The Kafka configuration parameter enable.partition.eof is what you need. When setting this configuration to true. It will emit the PARTITION_EOF event whenever the consumer reaches the end of a partition. So you can know when you reach the end of a partition through some callback function. By that way, you can choose to close the consumer when you reach the end of all partitions.
Upvotes: 3
Reputation: 2313
It looks like you want consumer.close() instead of KafkaConsumer.close(). It's not documented as a static method.
Upvotes: 2