Mikhail_Sam
Mikhail_Sam

Reputation: 11218

Poll several messages from Kafka

I'm using confluent_kafka package for working with Kafka. I create topic in this way:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

def my_producer():
    bootstrap_servers=['my_adress.com:9092',
                    'my_adress.com:9092']

    value_schema = avro.load('/home/ValueSchema.avsc')

    avroProducer = AvroProducer({
        'bootstrap.servers': bootstrap_servers[0]+','+bootstrap_servers[1],
        'schema.registry.url':'http://my_adress.com:8081',
        },
        default_value_schema=value_schema
        )

    for i in range(0, 25000):
        value = {"name":"Yuva","favorite_number":10,"favorite_color":"green","age":i*2}
        avroProducer.produce(topic='my_topik14', value=value)
        avroProducer.flush(0)
    print('Finished!')


if __name__ == '__main__':
    my_producer()

It works. (this get 24820 messages instead of 25000 by the way...) We can check it:

kafka-run-class kafka.tools.GetOffsetShell --broker-list my_adress.com:9092 --topic my_topik14
my_topik14:0:24819

Now I want to consume:

from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError

bootstrap_servers=['my_adress.com:9092',
                   'my_adress.com:9092']
c = AvroConsumer(
    {'bootstrap.servers': bootstrap_servers[0]+','+bootstrap_servers[1],
     'group.id': 'avroneversleeps',
     'schema.registry.url': 'http://my_adress.com:8081',
     'api.version.request': True,
     'fetch.min.bytes': 100000,
     'consume.callback.max.messages':1000,
     'batch.num.messages':2
     })
c.subscribe(['my_topik14'])
running = True

while running:
    msg = None
    try:
        msg = c.poll(0.1)
        if msg:
            if not msg.error():
                print(msg.value())
                c.commit(msg)
            elif msg.error().code() != KafkaError._PARTITION_EOF:
                print(msg.error())
                running = False
        else:
            print("No Message!! Happily trying again!!")
    except SerializerError as e:
        print("Message deserialization failed for %s: %s" % (msg, e))
        running = False
c.commit()
c.close()

But there is a problem: I read messages just one by one. My question is How to read batch of messages? I tried different parameters in Consumer config but they didn't cnahge anything!


Also I found this question on SO and tried the same parameters - it still doesn't work.

Also read this. But this is against the previous link...

Upvotes: 2

Views: 4198

Answers (2)

Mikhail_Sam
Mikhail_Sam

Reputation: 11218

AvroConsumer have no consume method. But it is easy to make my own implementation of this method as there is in Consume class (parent of AvroConsumer). Here is the code:

def consume_batch(self, num_messages=1, timeout=None):
    """
    This is an overriden method from confluent_kafka.Consumer class. This handles batch of message
    deserialization using avro schema

    :param int num_messages: number of messages to read in one batch (default=1)
    :param float timeout: Poll timeout in seconds (default: indefinite)
    :returns: list of messages objects with deserialized key and value as dict objects
    :rtype: Message
    """
    messages_out = []
    if timeout is None:
        timeout = -1
    messages = super(AvroConsumer, self).consume(num_messages=num_messages, timeout=timeout)
    if messages is None:
        return None
    else:
        for m in messages:
            if not m.value() and not m.key():
                return messages
            if not m.error():
                if m.value() is not None:
                    decoded_value = self._serializer.decode_message(m.value())
                    m.set_value(decoded_value)
                if m.key() is not None:
                    decoded_key = self._serializer.decode_message(m.key())
                    m.set_key(decoded_key)
                messages_out.append(m)
    #print(len(message))
    return messages_out

But after that we run test and this method give no any performance increasing. So looks like it just for better usability. Or I need to make some additional work about serializing not single message, but whole batch.

Upvotes: 0

Related Questions