Amir Afianian
Amir Afianian

Reputation: 2795

What is the best practice for keeping Kafka consumer alive in python?

Something is puzzling for me when it comes to keeping consumers alive. Let's say I have a topic to which data is constantly being written. But, in an hour in a day, there are no new messages. If I had set a timeout for my consumers, when there are no new messages, the consumer will get closed.

Now, new messages arrive. But, there are not consumers alive to consume them.

How should I handle such scenarios? My consumers may consume all messages and get closed. What is the best way to keep them alive? Is there any way to invoke them automatically upon the arrival of new messages? What are the best practices for such scenarios?

Upvotes: 11

Views: 7409

Answers (2)

NotoriousPyro
NotoriousPyro

Reputation: 647

Use a generator function

def consumableMessages(self):
    self.kafka.subscribe(self.topic)
    try:
        for message in self.kafka:
            yield message.value.decode("utf-8")
    except KeyboardInterrupt:
        self.kafka.close()

And then we can wait for messages:

for message in kafka.consumableMessages():
    print(message)

Upvotes: 1

Giorgos Myrianthous
Giorgos Myrianthous

Reputation: 39810

Why not just

import time
from confluent_kafka import Consumer


consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-consumer-1',
    'auto.offset.reset': 'earliest'
})
consumer.subscribe(['topicName'])

while True:
    try: 
        message = consumer.poll(10.0)

        if not message:
            time.sleep(120) # Sleep for 2 minutes

        if message.error():
            print(f"Consumer error: {message.error()}")
            continue

        print(f"Received message: {msg.value().decode('utf-8')}")
    except:
        # Handle any exception here
        ...
    finally:
        consumer.close()
        print("Goodbye")

I cannot comment on the requirement of "setting a timeout for consumers", but in most of the cases consumers are supposed to run "forever" and should also be added to consumer groups in a way that they are highly available.

Upvotes: 3

Related Questions