Reputation: 1008
My rabbitmq consumer processes incoming messages as shown in the following example: http://pika.readthedocs.org/en/latest/examples/asynchronous_consumer_example.html
The difference between my code and the example is that in my code each message takes a lot of time to process. (more than a hour).
When message finishes processing the following line is executed:
self._channel.basic_ack(delivery_tag)
However right after that, I receive an on_connection_closed event, which causes my consumer to reconnect.
The problem is that the ack is not delivered and the message that have already been processed is sent to another consumer and processed again.
Any help will be appreciated.
Upvotes: 1
Views: 809
Reputation: 1008
After a long research, found out what was the problem.
Apparently, despite the consumer implementation features asynchronous callbacks, all of them are called from the same thread.
If the "on_message" method is being executed for a long time, it prevents the thread from sending heartbeat messages to the server, which causes it to close the connection after a while.
The solution was to move the message handling to separate thread. There is one catch however. In order to prevent the consumer from handling more that one message simultaneously, the following line must be added to on_channel_open method:
channel.basic_qos(prefetch_count=self._num_of_cores)
Upvotes: 3