Reputation: 101
I am trying to write a python code to consume data from a Confluent Kafka topic and perform data validation as part of a testing project. I am able to read the data, however the consume process is in an infinite loop and looking for a decision point to exit if all the messages are read by the loop.
See sample code as below
conf = {'bootstrap.servers': "server:port", #
'group.id': str(uuid.uuid1),
'auto.offset.reset': 'earliest',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'session.timeout.ms': 6000
}
consumer = Consumer(conf)
consumer.subscribe([topic], on_assign=on_assign)
try:
while True:
msg=consumer.poll(timeout=2.0)
# print(msg)
if msg is None:
print('msg is None')
continue
if msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition reached {0}/{1}'
.format(msg.topic(), msg.partition()))
print( msg.error().code())
else:
print(msg.value())
except KeyboardInterrupt:
print("Interrupted through kb")
finally:
consumer.close()
Please advice what should be the best way to decide if all the messages are read so that I can exit the loop and close the consumer.
Upvotes: 2
Views: 2311
Reputation: 191
It looks like you are missing the enable.partition.eof=True
configuration that would emit an error when the end of a partition is reached. I was able to accomplish your ask by first determining what partitions exist on the topic I am consuming. Then, when I have reached the end of each partition, I exit.
Upvotes: 0
Reputation: 32090
Apache Kafka topics are, by definition, unbounded streams of events. There is no "end" to a stream, only an artificial end that you may choose to define.
You'll need to define this within your application logic. For example, if you don't receive a message for more than <x>
seconds, consider it the "end", and stop consuming.
Upvotes: 1