Reputation: 185
I'm pretty sure this can only be done if I create my own task class, but I'd like to know if anyone else has found a way to do this.
Upvotes: 16
Views: 17250
Reputation: 6555
Here is a full solution (works for Celery 4+):
import celery
from celery.task import task
class MyBaseClassForTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
# exc (Exception) - The exception raised by the task.
# args (Tuple) - Original arguments for the task that failed.
# kwargs (Dict) - Original keyword arguments for the task that failed.
print('{0!r} failed: {1!r}'.format(task_id, exc))
@task(name="foo:my_task", base=MyBaseClassForTask)
def add(x, y):
raise KeyError()
Resources:
Upvotes: 24
Reputation: 2757
You can provide the function directly to the decorator:
def fun(self, exc, task_id, args, kwargs, einfo):
print('Failed!')
@task(name="foo:my_task", on_failure=fun)
def add(x, y):
raise KeyError()
Upvotes: 17