AK_sat
AK_sat

Reputation: 101

Consume Data from Confluent Kafka Topic and Exit using Python

I am trying to write a python code to consume data from a Confluent Kafka topic and perform data validation as part of a testing project. I am able to read the data, however the consume process is in an infinite loop and looking for a decision point to exit if all the messages are read by the loop.

See sample code as below

conf = {'bootstrap.servers': "server:port", #
    'group.id': str(uuid.uuid1),
    'auto.offset.reset': 'earliest',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',        
    'session.timeout.ms': 6000
    }
consumer = Consumer(conf) 
consumer.subscribe([topic], on_assign=on_assign)
try:
while True:
    msg=consumer.poll(timeout=2.0)

    # print(msg)

    if msg is None:
        print('msg is None')
        continue

    if msg.error().code() == KafkaError._PARTITION_EOF:
        print('End of partition reached {0}/{1}'
              .format(msg.topic(), msg.partition()))
        print( msg.error().code())
    else:
        print(msg.value())           

except KeyboardInterrupt:
   print("Interrupted through kb")

finally:
   consumer.close()

Please advice what should be the best way to decide if all the messages are read so that I can exit the loop and close the consumer.

Upvotes: 2

Views: 2311

Answers (2)

clued__init__
clued__init__

Reputation: 191

It looks like you are missing the enable.partition.eof=True configuration that would emit an error when the end of a partition is reached. I was able to accomplish your ask by first determining what partitions exist on the topic I am consuming. Then, when I have reached the end of each partition, I exit.

Upvotes: 0

Robin Moffatt
Robin Moffatt

Reputation: 32090

Apache Kafka topics are, by definition, unbounded streams of events. There is no "end" to a stream, only an artificial end that you may choose to define.

You'll need to define this within your application logic. For example, if you don't receive a message for more than <x> seconds, consider it the "end", and stop consuming.

Upvotes: 1

Related Questions