Justin Thomas
Justin Thomas

Reputation: 5848

Never ending message loop: Same message redelivered in python rabbitmq consumer

I am using the Example consumer posted here:

http://pika.readthedocs.org/en/latest/examples/asynchronous_consumer_example.html

The reason I was using ExampleConsumer was my connection to rabbitmq was failing when the work tasks started taking longer, where longer is greater than 10 minutes. The connection was saying closed after the long running task completed and the process was failing. It previously went through 1000 messages that took a minute or so fine.

ExampleConsumer seems to reconnect fine, however, in the acknowledge message the message isn't actually acknowledged because the connection is dead. It seems to return normally from the below acknowledge message method. It then attempts reconnection after which the message that was just finished gets redelivered.

  def acknowledge_message(self, delivery_tag):
        """Acknowledge the message delivery from RabbitMQ by sending a
        Basic.Ack RPC method for the delivery tag.

        :param int delivery_tag: The delivery tag from the Basic.Deliver frame

        """
        LOGGER.info('Acknowledging message %s', delivery_tag)
        self._channel.basic_ack(delivery_tag)

Upvotes: 1

Views: 2553

Answers (3)

Laurent90
Laurent90

Reputation: 293

if you are using the pika async consumer example, you just need to add this change to the init method :

    self._url = 'amqp://{}:{}@{}:{}/%2F{}'.format(
                     self.USERNAME, self.PASSWORD, self.ADDRESS, self.PORT, self.QUERY) 

with self.QUERY a string that can be parametrized to set different parameters, e.g heartbeat as follows :

self.QUERY ='?heartbeat_interval=600'

The connect method will take care of the heartbeat transaction.

def connect(self):
    """This method connects to RabbitMQ, returning the connection handle.
    When the connection is established, the on_connection_open method
    will be invoked by pika.

    :rtype: pika.SelectConnection

    """
    LOGGER.info('Connecting to %s', self._url)
    return pika.SelectConnection(parameters=pika.URLParameters(self._url),
                                 on_open_callback=self.on_connection_open,
                                 on_open_error_callback=self.on_connection_error,
                                 stop_ioloop_on_close=False,
                                 )

This is a very good way to tell RabbitMQ which heartbeat to associate with your consumer. Note that RabbitMQ will force it to be at least 60s. Thus you cannot set it lower.

More info on these connection parameters : https://pika.readthedocs.io/en/latest/modules/parameters.html

Upvotes: 0

Derick Bailey
Derick Bailey

Reputation: 72858

You might need to add a heartbeat to your message consumer to keep your connection alive.

If rabbitmq thinks the consumer died while the message is in "unacknowledged" mode (still being processed) it will put the message back in the queue. Having a heartbeat may help keep the connection alive, preventing this from happening.

Upvotes: 0

user1778420
user1778420

Reputation: 44

RabbitMQ broker implements a default heartbeat timeout that, depending on the RabbitMQ version, is either ~ 10 minutes or 1 minute; the shorter default is in the more recent versions, beginning with RabbitMQ v3.5.5. The application can pass an explicit longer heartbeat timeout preference via connection parameters. Pika's SelectConnection doesn't have a background thread, so when the work task takes longer than the heartbeat timeout, SelectConnection is unable to service the heartbeats within the time limits expected by the broker, and the broker drops the connection. There are different ways you can try to get around this issue:

  1. Set a longer heartbeat timeout preference via pika.connection.ConnectionParameters (probably the easiest). ConnectionParameters.heartbeat_interval=0 is supposed to disable heartbeats (and heartbeat timeouts) altogether.
  2. Run the connection on a separate thread from the task-processing logic
  3. Switch to one of the cooperative multitasking connection types, such as Tornado or Twisted framework-based adapters in Pika, or gevent-based adapter in Haigha. This change would require the task processing logic to be friendly to cooperative multitasking.

Upvotes: 1

Related Questions