Reputation: 3725
I'm aware of Celery's command-line options:
celery -A my_app purge -Q queue_name
But I am looking for a way to purge queue_name
from my Python app with Celery, something along the lines of:
def start_chunk(num_of_objs):
# clear current queue before starting here
RELEVANT CODE HERE TO PURGE queue_name
for num in num_of_objcts:
some_task.apply_async(kwargs={'num': num}, queue="queue_name")
Note, I'm aware of this:
from proj.celery import app
app.control.purge()
But as I understand this purges all queues.
Upvotes: 2
Views: 956
Reputation: 3725
One way is to call the purge command through the os
, we must use the -f
flag:
import os
os.system("celery -A app_name purge -Q queue_name -f")
I don't know if there are any cons to this compared to DejanLekic's answer, which looks cleaner.
Upvotes: 0
Reputation: 19787
I admit, it is little bit difficult to get it right. But the first thing you should look at for hints how to solve this problem is bin/purge.py (that is what I've done). After analysing the mentioned file, I think something like the following should work:
from celery.app.base import Celery
from yourproject import celery_app
def purge_queue(app: Celery, queue_name: str):
with app.connection_for_write() as conn:
conn.default_channel.queue_purge(queue_name)
purge_queue(celery_app, "celery") # purge the "celery" queue
The above is more/less what celery -A yourproject.celery_app purge -Q celery
does.
Upvotes: 1