abrugh
abrugh

Reputation: 185

Can I add an on_failure callback to a celery task created with the task decorator?

I'm pretty sure this can only be done if I create my own task class, but I'd like to know if anyone else has found a way to do this.

Upvotes: 16

Views: 17250

Answers (2)

illagrenan
illagrenan

Reputation: 6555

Here is a full solution (works for Celery 4+):

import celery
from celery.task import task

class MyBaseClassForTask(celery.Task):

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        # exc (Exception) - The exception raised by the task.
        # args (Tuple) - Original arguments for the task that failed.
        # kwargs (Dict) - Original keyword arguments for the task that failed.
        print('{0!r} failed: {1!r}'.format(task_id, exc))

@task(name="foo:my_task", base=MyBaseClassForTask)
def add(x, y):
    raise KeyError()

Resources:

Upvotes: 24

Nolan Conaway
Nolan Conaway

Reputation: 2757

You can provide the function directly to the decorator:

def fun(self, exc, task_id, args, kwargs, einfo):
    print('Failed!')

@task(name="foo:my_task", on_failure=fun)
def add(x, y):
    raise KeyError()

Upvotes: 17

Related Questions