Reputation: 8014
I am using an external service (Service) to process some particular type of objects. The Service works faster if I send objects in batches of 10. My current architecture is as follows. A producer broadcasts objects one-by-one, and a bunch of consumers pull them (one-by-one) from a queue and send them to The Service. This is obviously suboptimal.
I don't want to modify producer code as it can be used in different cases. I can modify consumer code but only with the cost of additional complexity. I'm also aware of the prefetch_count
option but I think it only works on the network level -- the client library (pika) does not allow fetching multiple messages at once in the consumer callback.
So, can RabbitMQ create batches of messages before sending them to consumers? I'm looking for an option like "consume n messages at a time".
Upvotes: 10
Views: 7406
Reputation: 317
The below code will make use of channel.consume
to start consuming messages. We break out/stop when the desired number of messages is reached.
I have set a batch_size
to prevent pulling of huge number of messages at once. You can always change the batch_size
to fit your needs.
def consume_messages(queue_name: str):
msgs = list([])
batch_size = 500
q = channel.queue_declare(queue_name, durable=True, exclusive=False, auto_delete=False)
q_length = q.method.message_count
if not q_length:
return msgs
msgs_limit = batch_size if q_length > batch_size else q_length
try:
# Get messages and break out
for method_frame, properties, body in channel.consume(queue_name):
# Append the message
try:
msgs.append(json.loads(bytes.decode(body)))
except:
logger.info(f"Rabbit Consumer : Received message in wrong format {str(body)}")
# Acknowledge the message
channel.basic_ack(method_frame.delivery_tag)
# Escape out of the loop when desired msgs are fetched
if method_frame.delivery_tag == msgs_limit:
# Cancel the consumer and return any pending messages
requeued_messages = channel.cancel()
print('Requeued %i messages' % requeued_messages)
break
except (ChannelWrongStateError, StreamLostError, AMQPConnectionError) as e:
logger.info(f'Connection Interrupted: {str(e)}')
finally:
# Close the channel and the connection
channel.stop_consuming()
channel.close()
return msgs
Upvotes: 0