Reputation: 3080
I am using celery+rabbitmq. I can't find convenient way to clear queue in celery+rabbitmq. I do it with remove and create vhost.
rabbitmqctl delete_vhost <vhostpath>
rabbitmqctl add_vhost <vhostpath>
Is it prefer way to clear some celery queue ?
Upvotes: 4
Views: 5706
Reputation: 1656
If you are facing this problem because you used rabbitmq for the result backend and as a result you got too many queues, then i would suggest using a different result backend (redis or mongodb)
This is one well known flaw with the celery. It will create a separate queue for each result if you amqp for result backend.
If you still want to stick to amqp as result backend. It will clear itself in 24 hours. You can however set it to a smaller value using CELERY_AMQP_TASK_RESULT_EXPIRES
setting.
Upvotes: 2
Reputation: 1458
If you need to delete ALL items in queue (especially when the list is long)
1) Saves all items into the file
sudo rabbitmqctl list_queues -p /yourvhost name > queues.txt
don't forget to remove first and last lines from 'queues.txt'
2) Use mentioned python code to do the job
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="127.0.0.1:5672", userid="guest", password="guest", virtual_host="/yourvhost", insist=False)
conn = conn.channel()
queues = None
with open('queues.txt', 'r') as f:
queues = f.readlines()
for q in queues:
if q:
#print 'deleting %s' % q
conn.queue_purge(q.strip())
print 'purged %d items' % len(queues)
Upvotes: 1
Reputation: 15335
I'm not quite sure how celery works, but I suspect you want to purge a RabbitMQ queue (you're currently simulating this by deleting the queues and having celery re-create them).
You could install RabbitMQ's Management Plugin. Its WebUI will allow you to purge the required queue. This should also tell you which queue you're aiming for, so you wouldn't need to delete everything.
Once you know which queue it is, you could purge it programatically. For instance, using py-amqplib, you would do something like:
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False)
conn = conn.channel()
conn.queue_purge("the-target-queue")
There's probably a better way to do it, though.
Upvotes: 13