Reputation: 1320
I am currently using the async example consumer provided on the pika site, and was wondering if it was possible to have one consumer consume two queues? The examples on the rabbitmq site seems to cater only for one consumer per queue.
Upvotes: 0
Views: 1384
Reputation: 13
You only have to declare another queue (QUEUE_2='another_queue'), and modify some methods. There you have the methods I modified:
def on_exchange_declareok(self, unused_frame):
"""Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
command.
:param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
"""
LOGGER.info('Exchange declared')
self.setup_queue(self.QUEUE)
self.setup_queue(self.QUEUE_2)
def setup_queue(self, queue_name):
"""Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
command. When it is complete, the on_queue_declareok method will
be invoked by pika.
:param str|unicode queue_name: The name of the queue to declare.
"""
LOGGER.info('Declaring queue %s', queue_name)
self._channel.queue_declare(self.on_queue_declareok, self.QUEUE)
self._channel.queue_declare(self.on_queue_declareok, self.QUEUE_2)
def on_queue_declareok(self, method_frame):
"""Method invoked by pika when the Queue.Declare RPC call made in
setup_queue has completed. In this method we will bind the queue
and exchange together with the routing key by issuing the Queue.Bind
RPC command. When this command is complete, the on_bindok method will
be invoked by pika.
:param pika.frame.Method method_frame: The Queue.DeclareOk frame
"""
LOGGER.info('Binding %s to %s with %s',
self.EXCHANGE, self.QUEUE, self.ROUTING_KEY)
self._channel.queue_bind(self.on_bindok, self.QUEUE,
self.EXCHANGE, self.ROUTING_KEY)
LOGGER.info('Binding %s to %s with %s',
self.EXCHANGE, self.QUEUE_2, self.ROUTING_KEY)
self._channel.queue_bind(self.on_bindok, self.QUEUE_2,
self.EXCHANGE, self.ROUTING_KEY)
def start_consuming(self):
"""This method sets up the consumer by first calling
add_on_cancel_callback so that the object is notified if RabbitMQ
cancels the consumer. It then issues the Basic.Consume RPC command
which returns the consumer tag that is used to uniquely identify the
consumer with RabbitMQ. We keep the value to use it when we want to
cancel consuming. The on_message method is passed in as a callback pika
will invoke when a message is fully received.
"""
LOGGER.info('Issuing consumer related RPC commands')
self.add_on_cancel_callback()
self._chalnnel.basic_qos(prefetch_count=1)
self._consumer_tag = self._channel.basic_consume(self.on_message,
self.QUEUE)
self._consumer_tag = self._channel.basic_consume(self.on_message,
self.QUEUE_2)
Upvotes: 1