Reputation:
I'm having a problem getting my RabbitMQ queues to expire.
I'm using RabbitMQ 3.2.4; Server is running on Windows, and my client code is in C#.
I have tried setting both the queue's auto-delete parameter and the queue's TTL value via x-expire
. I have tried priming the queue with a dummy message to fake the appearance of a consumer, and I have even tried varying the exchange's persistence parameter.
Of the five possible combinations, none of them lead to the queue being deleted. I have waited hours (days?) after the last connection has closed, but the queues do not go away.
| auto-delete | x-expires | Prime |
| ----------- | --------- | ----- |
| false | false | false | // Don't care; no delete possible
| false | false | true | // Don't care; no delete possible
| false | true | false |
| false | true | true |
| true | false | false | // Fails consumer requirement
| true | false | true |
| true | true | false |
| true | true | true |
A queue won't auto-delete unless it has had at least one consumer, otherwise the queue could auto-delete immediately after being declared.
In case I'm presenting an XY problem, here's what I'm trying to do. We have a main queue that holds work tasks. We have a large number of workers that will pull a task; run and save the calculations; and then repeat that cycle. Every now and then a worker will pull a task but will not complete. We need the task to get requeued after a period of time so another worker can attempt to work on it.
I have seen a number of sites / blogs / whatever suggest using a retry queue along with RabbitMQ's dead-letter-exchange capabilities in order to requeue the task. The simplified work flow is:
Pull a task
Push a copy of the task to a retry queue
Perform work
Pull copy of task from retry queue to prevent requeueing.
If the worker fails to pull the copy, the copy will expire and get redirected back onto the main work queue.
The overall approach works, but the problem is that it creates a lot of empty retry queues. I'd like those retry queues to be deleted off.
Relevant code snippets.
I can provide the priming code if it's relevant but it's just a BasicPublish
followed by a BasicGet
private ConnectionFactory factory;
private IConnection connection;
private IModel channel;
private static string MainExchange = "MainExchange";
private static string RetryExchange = "RetryExchange";
private static string MainQueue = "MainQueue";
private static int messageRequeueTTL = 30000;
private static int requeueQueueTTL = messageRequeueTTL + 15000;
factory = new ConnectionFactory() { ... }
connection = factory.CreateConnection();
channel = connection.CreateModel();
channel.ExchangeDeclare(MainExchange, ExchangeType.Topic, true);
channel.ExchangeDeclare(RetryExchange, ExchangeType.Headers, false);
channel.QueueDeclare(MainQueue, true, false, false, null);
channel.QueueBind(MainQueue, MainExchange, "");
// Populate MainQueue with several calls of: channel.BasicPublish(MainExchange, "", null, body);
// ...
// Pull a message
BasicGetResult result = channel.BasicGet(MainQueue, false);
// Logic for requeueing; Foo is my work task class
string retryQueue = CreateRequeueName(foo.ID);
Dictionary<string, object> queueArgs = new Dictionary<string, object>
{
{"x-dead-letter-exchange", MainExchange}
,{"x-message-ttl", messageRequeueTTL}
};
Dictionary<string, object> bindArgs = new Dictionary<string, object>
{
{"x-match", "all"}
,{"key1", foo.ID}
,{"x-expires", requeueQueueTTL}
};
// Set auto delete or not here
channel.QueueDeclare(retryQueue, false, false, false, queueArgs);
channel.QueueBind(retryQueue, RetryExchange, "", bindArgs);
PrimeRetryQueue(foo.ID);
var body = Encoding.UTF8.GetBytes(Foo.ToXML(foo));
var props = channel.CreateBasicProperties();
props.Headers = new Dictionary<string, object>() { { "key1", foo.ID } };
channel.BasicPublish(RetryExchange, "", props, body);
//Acknowledge original message pulled from MainQueue
channel.BasicAck(result.DeliveryTag, false);
Upvotes: 3
Views: 2379
Reputation: 364
The "MainQueue" queue isn't deleted because you're setting the 4th parameter, autoDelete = false. This is the method signature:
QueueDeclareOk QueueDeclare (string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments);
If you want the queue to be deleted when the connection is closed, you need to do this:
channel.QueueDeclare(MainQueue, true, true, true, null);
In the second case, with your retryQueue, you are declaring it with "x-message-ttl" which controls message expiration, not queue expiration. The messages sent to that queue should expire after 30 seconds, but the queue will remain. You are also passing "x-expires" in in queue binding arguments, and AFAIK it has no effect there. If you want the queue itself to expire after 30 seconds, you should set this in the queue declaration arguments, queueArgs in your code.
Dictionary<string, object> queueArgs = new Dictionary<string, object>
{
{"x-dead-letter-exchange", MainExchange},
{"x-expires", messageRequeueTTL}
};
channel.QueueDeclare(retryQueue, false, false, false, queueArgs);
For your reference: https://www.rabbitmq.com/ttl.html
Upvotes: 3