Reputation: 8915
I am using a RabbitMQ producer to send long running tasks (30 mins+) to a consumer. The problem is that the consumer is still working on a task when the connection to the server is closed and the unacknowledged task is requeued.
From researching I understand that either a heartbeat or an increased connection timeout can be used to solve this. Both these solutions raise errors when attempting them. In reading answers to similar posts I've also learned that many changes have been implemented to RabbitMQ since the answers were posted (e.g. the default heartbeat timeout has changed to 60 from 580 prior to RabbitMQ 3.5.5).
When specifying a heartbeat and blocked connection timeout:
credentials = pika.PlainCredentials('user', 'password')
parameters = pika.ConnectionParameters('XXX.XXX.XXX.XXX', port, '/', credentials, blocked_connection_timeout=2000)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
The following error is displayed:
TypeError: __init__() got an unexpected keyword argument 'blocked_connection_timeout'
When specifying heartbeat_interval=1000
in the connection parameters a similar error is shown: TypeError: __init__() got an unexpected keyword argument 'heartbeat_interval'
And similarly for socket_timeout = 1000
the following error is displayed: TypeError: __init__() got an unexpected keyword argument 'socket_timeout'
I am running RabbitMQ 3.6.1, pika 0.10.0 and python 2.7 on Ubuntu 14.04.
I've read through answers to similar questions
Update: running code from the pika documentation produces the same error.
Upvotes: 17
Views: 11682
Reputation: 95
I've already see this issue. The reason is you declare to use this queue. but you didn't bind the queue in the exchange.
for example:
@Bean(name = "test_queue")
public Queue testQueue() {
return queue("test_queue");
}
@RabbitListener(queues = "test_queue_1")
public void listenCreateEvent(){
}
if you listen a queue didn't bind to the exchange. it will happen.
Upvotes: -3
Reputation: 72868
I've run into the same problem with my systems, that you are seeing, with dropped connection during very long tasks.
It's possible the heartbeat might help keep your connection alive, if your network setup is such that idle TCP/IP connections are forcefully dropped. If that's not the case, though, changing the heartbeat won't help.
Changing the connection timeout won't help at all. This setting is only used when initially creating the connection.
I am using a RabbitMQ producer to send long running tasks (30 mins+) to a consumer. The problem is that the consumer is still working on a task when the connection to the server is closed and the unacknowledged task is requeued.
there are two reasons for this, both of which you have run into already:
Having deployed RabbitMQ code with tasks that range from less than a second, out to several hours in time, I found that acknowledging the message immediately and updating the system with status messages works best for very long tasks, like this.
You will need to have a system of record (probably with a database) that keeps track of the status of a given job.
When the consumer picks up a message and starts the process, it should acknowledge the message right away and send a "started" status message to the system of record.
As the process completes, send another message to say it's done.
This won't solve the dropped connection problem, but nothing will 100% solve that anyways. Instead, it will prevent the message re-queueing problem from happening when a connection is dropped.
This solution does introduce another problem, though: when the long running process crashes, how do you resume the work?
The basic answer is to use the system of record (your database) status for the job to tell you that you need to pick up that work again. When the app starts, check the database to see if there is work that is unfinished. If there is, resume or restart that work in whatever manner is appropriate.
Upvotes: 15