Reputation: 135
I'd like to inspect prior scheduled tasks before creating a new one to prevent duplicates. How can I get a list of all scheduled tasks and their arguments using python and celery?
Upvotes: 6
Views: 5996
Reputation: 37708
I am putting this here for my own record, mainly, for the next time I need it. The syntax that worked for me:
from celery import app
app.app_or_default().conf.beat_schedule
gets
{'batch1': {'schedule': 900,
'task': 'app.tasks.batch1',
'args': (),
'kwargs': {},
'options': {}},
'batch2': {'schedule': 600,
'task': 'app.tasks.batch2',
'args': (),
'kwargs': {},
'options': {}},
....
Upvotes: 1
Reputation: 10709
my_proj/celery.py
from celery import Celery
from celery.schedules import crontab
app = Celery("my_proj")
app.conf.update(
imports=["task"],
timezone="UTC",
beat_schedule={
"task.common.add": {
"task": "task.common.add",
"schedule": crontab(),
'args': (1, 2),
},
"task.common.mul": {
"task": "task.common.mul",
"schedule": crontab(minute=0, hour=0),
'args': (3,),
'kwargs': {'y': 4},
},
},
)
task/common.py
from my_proj.celery import app # Use @shared_task if in Django
@app.task
def add(x, y):
print(f"{x}, {y}, {x + y}")
return x + y
@app.task
def mul(x, y):
print(f"{x}, {y}, {x * y}")
return x * y
Option 1:
>>> from my_proj.celery import app
>>> app.conf.beat_schedule
{'task.common.add': {'task': 'task.common.add', 'schedule': <crontab: * * * * * (m/h/d/dM/MY)>, 'args': (1, 2)}, 'task.common.mul': {'task': 'task.common.mul', 'schedule': <crontab: 0 0 * * * (m/h/d/dM/MY)>, 'args': (3,), 'kwargs': {'y': 4}}}
Option 2:
If using a file scheduler e.g. celery.beat.PersistentScheduler (default) which writes to a local shelve database file of name celerybeat-schedule
.
>>> import shelve
>>> with shelve.open('celerybeat-schedule') as schedule:
... print(schedule['entries'])
...
{'task.common.mul': <ScheduleEntry: task.common.mul task.common.mul(3, y=4) <crontab: 0 0 * * * (m/h/d/dM/MY)>, 'task.common.add': <ScheduleEntry: task.common.add task.common.add(1, 2) <crontab: * * * * * (m/h/d/dM/MY)>, 'celery.backend_cleanup': <ScheduleEntry: celery.backend_cleanup celery.backend_cleanup() <crontab: 0 4 * * * (m/h/d/dM/MY)>}
Option 3:
If using a database scheduler e.g. django-celery-beat or celery-sqlalchemy-scheduler, just query the database records. So if using django-celery-beat, it would be:
>>> from django_celery_beat.models import PeriodicTask, PeriodicTasks
>>> PeriodicTask.objects.all()
<ExtendedQuerySet [<PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/dM/MY/d) UTC>, <PeriodicTask: task.common.add: * * * * * (m/h/dM/MY/d) UTC>, <PeriodicTask: task.common.mul: 0 0 * * * (m/h/dM/MY/d) UTC>]>
>>> PeriodicTask.objects.values('name', 'task', 'args', 'kwargs')
<ExtendedQuerySet [{'name': 'celery.backend_cleanup', 'task': 'celery.backend_cleanup', 'args': '[]', 'kwargs': '{}'}, {'name': 'task.common.add', 'task': 'task.common.add', 'args': '[1, 2]', 'kwargs': '{}'}, {'name': 'task.common.mul', 'task': 'task.common.mul', 'args': '[3]', 'kwargs': '{"y": 4}'}]>
Upvotes: 7