Reputation: 5848
I am using the Example consumer posted here:
http://pika.readthedocs.org/en/latest/examples/asynchronous_consumer_example.html
The reason I was using ExampleConsumer was my connection to rabbitmq was failing when the work tasks started taking longer, where longer is greater than 10 minutes. The connection was saying closed after the long running task completed and the process was failing. It previously went through 1000 messages that took a minute or so fine.
ExampleConsumer seems to reconnect fine, however, in the acknowledge message the message isn't actually acknowledged because the connection is dead. It seems to return normally from the below acknowledge message method. It then attempts reconnection after which the message that was just finished gets redelivered.
def acknowledge_message(self, delivery_tag):
"""Acknowledge the message delivery from RabbitMQ by sending a
Basic.Ack RPC method for the delivery tag.
:param int delivery_tag: The delivery tag from the Basic.Deliver frame
"""
LOGGER.info('Acknowledging message %s', delivery_tag)
self._channel.basic_ack(delivery_tag)
Upvotes: 1
Views: 2553
Reputation: 293
if you are using the pika async consumer example, you just need to add this change to the init method :
self._url = 'amqp://{}:{}@{}:{}/%2F{}'.format(
self.USERNAME, self.PASSWORD, self.ADDRESS, self.PORT, self.QUERY)
with self.QUERY a string that can be parametrized to set different parameters, e.g heartbeat as follows :
self.QUERY ='?heartbeat_interval=600'
The connect method will take care of the heartbeat transaction.
def connect(self):
"""This method connects to RabbitMQ, returning the connection handle.
When the connection is established, the on_connection_open method
will be invoked by pika.
:rtype: pika.SelectConnection
"""
LOGGER.info('Connecting to %s', self._url)
return pika.SelectConnection(parameters=pika.URLParameters(self._url),
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_error,
stop_ioloop_on_close=False,
)
This is a very good way to tell RabbitMQ which heartbeat to associate with your consumer. Note that RabbitMQ will force it to be at least 60s. Thus you cannot set it lower.
More info on these connection parameters : https://pika.readthedocs.io/en/latest/modules/parameters.html
Upvotes: 0
Reputation: 72858
You might need to add a heartbeat to your message consumer to keep your connection alive.
If rabbitmq thinks the consumer died while the message is in "unacknowledged" mode (still being processed) it will put the message back in the queue. Having a heartbeat may help keep the connection alive, preventing this from happening.
Upvotes: 0
Reputation: 44
RabbitMQ broker implements a default heartbeat timeout that, depending on the RabbitMQ version, is either ~ 10 minutes or 1 minute; the shorter default is in the more recent versions, beginning with RabbitMQ v3.5.5. The application can pass an explicit longer heartbeat timeout preference via connection parameters. Pika's SelectConnection doesn't have a background thread, so when the work task takes longer than the heartbeat timeout, SelectConnection is unable to service the heartbeats within the time limits expected by the broker, and the broker drops the connection. There are different ways you can try to get around this issue:
Upvotes: 1