Eser Aygün
Eser Aygün

Reputation: 8014

Consume multiple messages at a time

I am using an external service (Service) to process some particular type of objects. The Service works faster if I send objects in batches of 10. My current architecture is as follows. A producer broadcasts objects one-by-one, and a bunch of consumers pull them (one-by-one) from a queue and send them to The Service. This is obviously suboptimal.

I don't want to modify producer code as it can be used in different cases. I can modify consumer code but only with the cost of additional complexity. I'm also aware of the prefetch_count option but I think it only works on the network level -- the client library (pika) does not allow fetching multiple messages at once in the consumer callback.

So, can RabbitMQ create batches of messages before sending them to consumers? I'm looking for an option like "consume n messages at a time".

Upvotes: 10

Views: 7406

Answers (1)

Prateek
Prateek

Reputation: 317

The below code will make use of channel.consume to start consuming messages. We break out/stop when the desired number of messages is reached.

I have set a batch_size to prevent pulling of huge number of messages at once. You can always change the batch_size to fit your needs.

    def consume_messages(queue_name: str):
    msgs = list([])
    batch_size = 500

    q = channel.queue_declare(queue_name, durable=True, exclusive=False, auto_delete=False)
    q_length = q.method.message_count
    
    if not q_length:
        return msgs

    msgs_limit = batch_size if q_length > batch_size else q_length

    try:
        # Get messages and break out
        for method_frame, properties, body in channel.consume(queue_name):

            # Append the message
            try:
                msgs.append(json.loads(bytes.decode(body)))
            except:
                logger.info(f"Rabbit Consumer : Received message in wrong format {str(body)}")

            # Acknowledge the message
            channel.basic_ack(method_frame.delivery_tag)

            # Escape out of the loop when desired msgs are fetched
            if method_frame.delivery_tag == msgs_limit:

                # Cancel the consumer and return any pending messages
                requeued_messages = channel.cancel()
                print('Requeued %i messages' % requeued_messages)
                break

    except (ChannelWrongStateError, StreamLostError, AMQPConnectionError) as e:
        logger.info(f'Connection Interrupted: {str(e)}')

    finally:
        # Close the channel and the connection
        channel.stop_consuming()
        channel.close()

    return msgs

Upvotes: 0

Related Questions