Reputation: 53
I want to run a complex task scheduled by beat. Let us assume the default add/mul tasks are defined.
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(
crontab(),
add.s(2,3) | mul.s(2)
)
But this will return an error in the worker:
NotImplementedError: chain is not a real task
How can I schedule a non trivial task with celery beat?
Upvotes: 5
Views: 3448
Reputation: 1
for add chained periodic tasks you can use an @app.task when declare your chain and then, add this new task on add_periodic_task() method. Example:
@app.on_after_finalize.connect ->i use this because it`s declared on task.py
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(timedelta(minutes=10), chian_st22.s(),name='test')
@app.task
def chian_st22(): -> i create the task with chain
cadena = chain(st22.s(), mailer.s()).apply_async()
@app.task
def mailer(data):
clase = CheckAlert()
mail = clase.envio_mail(data)
return mail
@app.task
def st22():
clase = CheckAlert()
st = clase.check_st22_dumps()
return st
Upvotes: 0
Reputation: 909
One way to do that is to schedule your tasks chain in beat_schedule
in your celeryconfig
, using link
option, celery_tasks
here is a module name where your tasks are defined
from celery.schedules import crontab
from celery import signature
beat_schedule = {
'chained': {
'task': 'celery_tasks.add',
'schedule': crontab(),
'options': {
'queue': 'default',
'link': signature('celery_tasks.mul',
args=(),
kwargs={},
options={
'link': signature('celery_tasks.another_task',
args=(),
kwargs={},
queue='default')
},
queue='default')
},
'args': ()
}
}
Upvotes: 6