void
void

Reputation: 2543

RMQ asynchronous consumer : pika.adapters.base_connection, _handle_error, Fatal Socket Error: error(9, 'Bad file descriptor')

I have a RabbitMQ (version 3.2.4) async consumer (as described here) implemented and listens to a queue / routing-key and was running without any issues until I recently made some changes.

Certain tasks are time-consuming, hence I decided to use the multiprocessing library to spin off sub-processes which do these intensive tasks using a multiprocessing Queue / Pool design so that my main task is performed without any waiting.

my_queue = multiprocessing.Queue()
my_pool = multiprocessing.Pool(2, my_method, (my_queue,))

Once the queue and pool are initialised, I pass on the queue as an argument while initializing the consumer (ExampleConsumer's __init__ method, as in the example link above). Then, within the on_message method, I push messages to the my_queue for doing the time-intensive tasks.

Edit:

some code sample:

def main():
    logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
    my_queue = multiprocessing.Queue()
    my_pool = multiprocessing.Pool(2, my_class().my_method, (my_queue,))
    example = ExampleConsumer('amqp://guest:guest@localhost:5672/%2F', my_queue)
    try:
        example.run()
        my_pool.close()
        my_pool.join()
    except KeyboardInterrupt:
        my_pool.terminate()
        example.stop()

The init method and on_message method of consumer:

def __init__(self, amqp_url, queue):
        """Create a new instance of the consumer class, passing in the AMQP
        URL used to connect to RabbitMQ.

        :param str amqp_url: The AMQP url to connect with

        """
        self._connection = None
        self._channel = None
        self._closing = False
        self._consumer_tag = None
        self._url = amqp_url
        self.queue = queue

def on_message(self, unused_channel, basic_deliver, properties, body):
        """Invoked by pika when a message is delivered from RabbitMQ. The
        channel is passed for your convenience. The basic_deliver object that
        is passed in carries the exchange, routing key, delivery tag and
        a redelivered flag for the message. The properties passed in is an
        instance of BasicProperties with the message properties and the body
        is the message that was sent.

        :param pika.channel.Channel unused_channel: The channel object
        :param pika.Spec.Basic.Deliver: basic_deliver method
        :param pika.Spec.BasicProperties: properties
        :param str|unicode body: The message body

        """
        LOGGER.info('Received message # %s from %s: %s',
                    basic_deliver.delivery_tag, properties.app_id, body)
        self.acknowledge_message(basic_deliver.delivery_tag)
        self.queue.put(str(body))

After making these changes I have started seeing an exception of the following type :

File "consumer_new.py", line 500, in run
    self._connection.ioloop.start()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 355, in start
    self.process_timeouts()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 283, in process_timeouts
    timer['callback']()
  File "consumer_new.py", line 290, in reconnect
    self._connection.ioloop.start()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 354, in start
    self.poll()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 602, in poll
    self._process_fd_events(fd_event_map, write_only)
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 443, in _process_fd_events
    handler(fileno, events, write_only=write_only)
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 364, in _handle_events
    self._handle_read()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 415, in _handle_read
    self._on_data_available(data)
  File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1347, in _on_data_available
    self._process_frame(frame_value)
  File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1427, in _process_frame
    self._deliver_frame_to_channel(frame_value)
  File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1028, in _deliver_frame_to_channel
    return self._channels[value.channel_number]._handle_content_frame(value)
  File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 896, in _handle_content_frame
    self._on_deliver(*response)
  File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 983, in _on_deliver
    header_frame.properties, body)
  File "consumer_new.py", line 452, in on_message
    self.acknowledge_message(basic_deliver.delivery_tag)
  File "consumer_new.py", line 463, in acknowledge_message
    self._channel.basic_ack(delivery_tag)
  File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 159, in basic_ack
    return self._send_method(spec.Basic.Ack(delivery_tag, multiple))
  File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 1150, in _send_method
    self.connection._send_method(self.channel_number, method_frame, content)
  File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1569, in _send_method
    self._send_frame(frame.Method(channel_number, method_frame))
  File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1554, in _send_frame
    self._flush_outbound()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 282, in _flush_outbound
    self._handle_write()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 452, in _handle_write
    return self._handle_error(error)
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 338, in _handle_error
    self._handle_disconnect()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 288, in _handle_disconnect
    self._adapter_disconnect()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 94, in _adapter_disconnect
    self.ioloop.remove_handler(self.socket.fileno())
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 579, in remove_handler
    super(PollPoller, self).remove_handler(fileno)
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 328, in remove_handler
    self.update_handler(fileno, 0)
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 571, in update_handler
    self._poll.modify(fileno, events)
IOError: [Errno 9] Bad file descriptor

The run() method keeps on running in the main process without any intervention. If that's the case I don't understand why a Bad File Descriptor error would arise, as nobody else could close the rmq connection. Also, the consumer seems to run without any issues for 3-4 hours before it fails due to the above reason.

I checked on the Rabbitmq UI if there are insufficient amount of file descriptors. But that doesn't seem to be the problem. I can't get a lead on what might be the problem.

Any help is appreciated! Thanks.

Upvotes: 0

Views: 983

Answers (1)

Hannu
Hannu

Reputation: 12205

Pika is not thread safe. It says so clearly in the documentation. All sorts of things will eventually go wrong and your program will crash to weird and uninformative errors if you do anything to your connections or channels in threads or subprocesses. It may seem to work for a while but eventually Pika structuress will get corrupted.

If you need multiprocessing and rabbitmq, you have a couple of options.

  1. Use rabbitpy instead of Pika. I have not used it so I cannot comment on its suitability to you, but it is thread safe.
  2. If you can, separate tasks so that your subprocesses can open their own Pika connections. This does not work if your main program receives a request, has a subprocess to process it and then send a result. If you need to send an ack for example, you cannot have your subprocesses ack messages received in main process.
  3. Remove Pika from subprocesses. If the idea of your subprocesses is to dispatch calculations or time consuming tasks to them, you can try creating two queues: one for subprocess input and one for output, and have your subprocess return results to main program in a queue. Then the main program can handle rabbitmq traffic based on this.
  4. If your program is a server of some kind that processes requests, split everything to subprocesses ("Work queue" -model) https://www.rabbitmq.com/tutorials/tutorial-two-python.html and have every subprocess subscribe independently as a consumer to the queue. Rabbitmq takes care of round-robin dispatch, and by limiting prefetch you can make it so that a subprocess picks exactly one task, and until processing of that task is completed, it will not pick up anything else, ensuring tasks sent immediately after the first one will be picked up by idle threads or subprocesses. In this model your main does not need Pika connection at all, and every subprocess has an independent connection as in 2).

Hope this helps.

Hannu

Upvotes: 4

Related Questions