Reputation: 2271
Very new to kafka and Avro. I am stuck with a problem and can not seem to figure out what is going wrong here. I had written a producer and consumer of kafka which uses Avro as the serialization format. The producer code is working properly. As after running that code when I run the kafka-avro-console-consumer
it give me as following -
bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test --property schema.registry.url=http://127.0.0.1:8081 --from-beginning
{"name":{"string":"Hello World!"}}
{"name":{"string":"Hello World!"}}
{"name":{"string":"Hello World!"}}
However, when I try to do the same using python (following this the most basic example ) I write the following code -
from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
class AvroConsumerAdapter(object):
def __init__(self, topic='test'):
self.topic = topic
self.consumer = AvroConsumer({'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://127.0.0.1:8081',
'group.id': 'mygroup'})
self.consumer.subscribe([topic])
def start_consuming(self):
running = True
while running:
try:
msg = self.consumer.poll(10)
if msg:
print(msg.value())
if not msg.error():
print("Here - 1")
print(msg.value())
elif msg.error().code() != KafkaError._PARTITION_EOF:
print("here-2")
print(msg.error())
running = False
else:
print('Here-3')
print(msg.error())
except SerializerError as e:
print("Message deserialization failed for %s: %s" % (msg, e))
running = False
except Exception as ex:
print(ex)
running = False
self.consumer.close()
This client stays there forever and never prints anything. I am not sure what is wrong here. Can anyone please help me in this.
Upvotes: 0
Views: 3961
Reputation: 32080
Check out the topic config options -- you need to set auto.offset.reset': 'smallest'
if you want to process all of the data currently in the topic. By default it's largest
which means it'll only show new rows of data produced. You can verify this by leaving your current Python code running and producing new messages to the topic - you should see the Python code pick them up.
Upvotes: 1