Reputation: 2543
I have a RabbitMQ (version 3.2.4) async consumer (as described here) implemented and listens to a queue / routing-key and was running without any issues until I recently made some changes.
Certain tasks are time-consuming, hence I decided to use the multiprocessing library to spin off sub-processes which do these intensive tasks using a multiprocessing Queue / Pool design so that my main task is performed without any waiting.
my_queue = multiprocessing.Queue()
my_pool = multiprocessing.Pool(2, my_method, (my_queue,))
Once the queue and pool are initialised, I pass on the queue as an argument while initializing the consumer (ExampleConsumer
's __init__
method, as in the example link above). Then, within the on_message
method, I push messages to the my_queue
for doing the time-intensive tasks.
Edit:
some code sample:
def main():
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
my_queue = multiprocessing.Queue()
my_pool = multiprocessing.Pool(2, my_class().my_method, (my_queue,))
example = ExampleConsumer('amqp://guest:guest@localhost:5672/%2F', my_queue)
try:
example.run()
my_pool.close()
my_pool.join()
except KeyboardInterrupt:
my_pool.terminate()
example.stop()
The init method and on_message method of consumer:
def __init__(self, amqp_url, queue):
"""Create a new instance of the consumer class, passing in the AMQP
URL used to connect to RabbitMQ.
:param str amqp_url: The AMQP url to connect with
"""
self._connection = None
self._channel = None
self._closing = False
self._consumer_tag = None
self._url = amqp_url
self.queue = queue
def on_message(self, unused_channel, basic_deliver, properties, body):
"""Invoked by pika when a message is delivered from RabbitMQ. The
channel is passed for your convenience. The basic_deliver object that
is passed in carries the exchange, routing key, delivery tag and
a redelivered flag for the message. The properties passed in is an
instance of BasicProperties with the message properties and the body
is the message that was sent.
:param pika.channel.Channel unused_channel: The channel object
:param pika.Spec.Basic.Deliver: basic_deliver method
:param pika.Spec.BasicProperties: properties
:param str|unicode body: The message body
"""
LOGGER.info('Received message # %s from %s: %s',
basic_deliver.delivery_tag, properties.app_id, body)
self.acknowledge_message(basic_deliver.delivery_tag)
self.queue.put(str(body))
After making these changes I have started seeing an exception of the following type :
File "consumer_new.py", line 500, in run
self._connection.ioloop.start()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 355, in start
self.process_timeouts()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 283, in process_timeouts
timer['callback']()
File "consumer_new.py", line 290, in reconnect
self._connection.ioloop.start()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 354, in start
self.poll()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 602, in poll
self._process_fd_events(fd_event_map, write_only)
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 443, in _process_fd_events
handler(fileno, events, write_only=write_only)
File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 364, in _handle_events
self._handle_read()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 415, in _handle_read
self._on_data_available(data)
File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1347, in _on_data_available
self._process_frame(frame_value)
File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1427, in _process_frame
self._deliver_frame_to_channel(frame_value)
File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1028, in _deliver_frame_to_channel
return self._channels[value.channel_number]._handle_content_frame(value)
File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 896, in _handle_content_frame
self._on_deliver(*response)
File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 983, in _on_deliver
header_frame.properties, body)
File "consumer_new.py", line 452, in on_message
self.acknowledge_message(basic_deliver.delivery_tag)
File "consumer_new.py", line 463, in acknowledge_message
self._channel.basic_ack(delivery_tag)
File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 159, in basic_ack
return self._send_method(spec.Basic.Ack(delivery_tag, multiple))
File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 1150, in _send_method
self.connection._send_method(self.channel_number, method_frame, content)
File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1569, in _send_method
self._send_frame(frame.Method(channel_number, method_frame))
File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1554, in _send_frame
self._flush_outbound()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 282, in _flush_outbound
self._handle_write()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 452, in _handle_write
return self._handle_error(error)
File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 338, in _handle_error
self._handle_disconnect()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 288, in _handle_disconnect
self._adapter_disconnect()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 94, in _adapter_disconnect
self.ioloop.remove_handler(self.socket.fileno())
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 579, in remove_handler
super(PollPoller, self).remove_handler(fileno)
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 328, in remove_handler
self.update_handler(fileno, 0)
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 571, in update_handler
self._poll.modify(fileno, events)
IOError: [Errno 9] Bad file descriptor
The run()
method keeps on running in the main process without any intervention. If that's the case I don't understand why a Bad File Descriptor error would arise, as nobody else could close the rmq connection. Also, the consumer seems to run without any issues for 3-4 hours before it fails due to the above reason.
I checked on the Rabbitmq UI if there are insufficient amount of file descriptors. But that doesn't seem to be the problem. I can't get a lead on what might be the problem.
Any help is appreciated! Thanks.
Upvotes: 0
Views: 983
Reputation: 12205
Pika is not thread safe. It says so clearly in the documentation. All sorts of things will eventually go wrong and your program will crash to weird and uninformative errors if you do anything to your connections or channels in threads or subprocesses. It may seem to work for a while but eventually Pika structuress will get corrupted.
If you need multiprocessing and rabbitmq, you have a couple of options.
Hope this helps.
Hannu
Upvotes: 4