Reputation: 11218
I'm using confluent_kafka package for working with Kafka. I create topic in this way:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
def my_producer():
bootstrap_servers=['my_adress.com:9092',
'my_adress.com:9092']
value_schema = avro.load('/home/ValueSchema.avsc')
avroProducer = AvroProducer({
'bootstrap.servers': bootstrap_servers[0]+','+bootstrap_servers[1],
'schema.registry.url':'http://my_adress.com:8081',
},
default_value_schema=value_schema
)
for i in range(0, 25000):
value = {"name":"Yuva","favorite_number":10,"favorite_color":"green","age":i*2}
avroProducer.produce(topic='my_topik14', value=value)
avroProducer.flush(0)
print('Finished!')
if __name__ == '__main__':
my_producer()
It works. (this get 24820 messages instead of 25000 by the way...) We can check it:
kafka-run-class kafka.tools.GetOffsetShell --broker-list my_adress.com:9092 --topic my_topik14
my_topik14:0:24819
Now I want to consume:
from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
bootstrap_servers=['my_adress.com:9092',
'my_adress.com:9092']
c = AvroConsumer(
{'bootstrap.servers': bootstrap_servers[0]+','+bootstrap_servers[1],
'group.id': 'avroneversleeps',
'schema.registry.url': 'http://my_adress.com:8081',
'api.version.request': True,
'fetch.min.bytes': 100000,
'consume.callback.max.messages':1000,
'batch.num.messages':2
})
c.subscribe(['my_topik14'])
running = True
while running:
msg = None
try:
msg = c.poll(0.1)
if msg:
if not msg.error():
print(msg.value())
c.commit(msg)
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
else:
print("No Message!! Happily trying again!!")
except SerializerError as e:
print("Message deserialization failed for %s: %s" % (msg, e))
running = False
c.commit()
c.close()
But there is a problem: I read messages just one by one. My question is How to read batch of messages? I tried different parameters in Consumer config but they didn't cnahge anything!
Also I found this question on SO and tried the same parameters - it still doesn't work.
Also read this. But this is against the previous link...
Upvotes: 2
Views: 4198
Reputation: 333
You can do it using consume([num_messages=1][, timeout=-1])
method. API ref. here:
For Consumer: https://docs.confluent.io/current/clients/confluent-kafka-python/index.html#confluent_kafka.Consumer.consume
For AvroConsumer: https://docs.confluent.io/current/clients/confluent-kafka-python/index.html?highlight=avroconsumer#confluent_kafka.Consumer.consume
More about the issue here:
https://github.com/confluentinc/confluent-kafka-python/issues/252
Upvotes: 1
Reputation: 11218
AvroConsumer have no consume
method. But it is easy to make my own implementation of this method as there is in Consume class (parent of AvroConsumer).
Here is the code:
def consume_batch(self, num_messages=1, timeout=None):
"""
This is an overriden method from confluent_kafka.Consumer class. This handles batch of message
deserialization using avro schema
:param int num_messages: number of messages to read in one batch (default=1)
:param float timeout: Poll timeout in seconds (default: indefinite)
:returns: list of messages objects with deserialized key and value as dict objects
:rtype: Message
"""
messages_out = []
if timeout is None:
timeout = -1
messages = super(AvroConsumer, self).consume(num_messages=num_messages, timeout=timeout)
if messages is None:
return None
else:
for m in messages:
if not m.value() and not m.key():
return messages
if not m.error():
if m.value() is not None:
decoded_value = self._serializer.decode_message(m.value())
m.set_value(decoded_value)
if m.key() is not None:
decoded_key = self._serializer.decode_message(m.key())
m.set_key(decoded_key)
messages_out.append(m)
#print(len(message))
return messages_out
But after that we run test and this method give no any performance increasing. So looks like it just for better usability. Or I need to make some additional work about serializing not single message, but whole batch.
Upvotes: 0