SRC
SRC

Reputation: 2271

confluent-kafka based consumer in Python does not work

Very new to kafka and Avro. I am stuck with a problem and can not seem to figure out what is going wrong here. I had written a producer and consumer of kafka which uses Avro as the serialization format. The producer code is working properly. As after running that code when I run the kafka-avro-console-consumer it give me as following -

bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test --property schema.registry.url=http://127.0.0.1:8081 --from-beginning
{"name":{"string":"Hello World!"}}
{"name":{"string":"Hello World!"}}
{"name":{"string":"Hello World!"}}

However, when I try to do the same using python (following this the most basic example ) I write the following code -

from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError


class AvroConsumerAdapter(object):

    def __init__(self, topic='test'):
        self.topic = topic
        self.consumer = AvroConsumer({'bootstrap.servers': 'localhost:9092',
                                      'schema.registry.url': 'http://127.0.0.1:8081',
                                      'group.id': 'mygroup'})
        self.consumer.subscribe([topic])

    def start_consuming(self):
        running = True
        while running:
            try:
                msg = self.consumer.poll(10)
                if msg:
                    print(msg.value())
                    if not msg.error():
                        print("Here - 1")
                        print(msg.value())
                    elif msg.error().code() != KafkaError._PARTITION_EOF:
                        print("here-2")
                        print(msg.error())
                        running = False
                    else:
                        print('Here-3')
                        print(msg.error())
            except SerializerError as e:
                print("Message deserialization failed for %s: %s" % (msg, e))
                running = False
            except Exception as ex:
                print(ex)
                running = False

        self.consumer.close()

This client stays there forever and never prints anything. I am not sure what is wrong here. Can anyone please help me in this.

Upvotes: 0

Views: 3961

Answers (1)

Robin Moffatt
Robin Moffatt

Reputation: 32080

Check out the topic config options -- you need to set auto.offset.reset': 'smallest' if you want to process all of the data currently in the topic. By default it's largest which means it'll only show new rows of data produced. You can verify this by leaving your current Python code running and producing new messages to the topic - you should see the Python code pick them up.

Upvotes: 1

Related Questions