Michael DiStefano
Michael DiStefano

Reputation: 594

How do you ensure a Celery chord callback gets called with failed subtasks?

I am using a Chord in Celery to have a callback that gets called when a Group of parallel tasks finish executing. Specifically, I have a group of functions that wrap calls to an external API. I want to wait for all of these to return before I process the results and update my database in the Chord callback. I would like the callback to execute when all of the API calls have finished, regardless of their status.

My problem is that the callback function only gets called if none of the group's subtasks raise an exception. If, however, one subtask raises an exception then an optional error handler on_error() gets called with a string representation of the task_id of the chord. The remaining tasks in the group do continue execution but the callback is never called.

I'll illustrate this with an example below:

@app.task
def maybe_succeed():
  divisor = randint(0, 10)
  return 1 / divisor

@app.task
def master_task():
 g = group([maybe_succeed.s() for i in range(100)])
 c = g | chord_callback.s()
 return c.delay()

@app.task
def chord_callback(results):
  print 'Made it here!'

In the above example, calling master_task() will run all of the tasks in the group, however, the callback will never get called because one of the maybe_succeed() will fail (unless you're super lucky!).


Right now, I'm dealing with this problem by catching all exceptions in my equivalent of maybe_succeed() so that the chord will never fail. I guess this is a fine solution though it doesn't feel right.

So, my question is: Is there a way to have a Celery Chord callback execute regardless of the return status of its group's subtasks?

Upvotes: 7

Views: 5430

Answers (2)

danidee
danidee

Reputation: 9624

I stumbled upon this problem recently and the only solution for now is to monkey patch the result backend (In my Case Redis) to stop the exception from being reraised.

import celery
from celery import Celery, group, states
from celery.backends.redis import RedisBackend


def patch_celery():
    """Patch the redis backend."""
    def _unpack_chord_result(
        self, tup, decode,
        EXCEPTION_STATES=states.EXCEPTION_STATES,
        PROPAGATE_STATES=states.PROPAGATE_STATES,
    ):
        _, tid, state, retval = decode(tup)

        if state in EXCEPTION_STATES:
            retval = self.exception_to_python(retval)
        if state in PROPAGATE_STATES:
            # retval is an Exception
            return '{}: {}'.format(retval.__class__.__name__, str(retval))

        return retval

    celery.backends.redis.RedisBackend._unpack_chord_result = _unpack_chord_result

    return celery

Now you can call patch_celery().Celery to access the patched version. I wrote an article that explains everything.

Upvotes: 6

user9538
user9538

Reputation: 1014

You could try calling the original callback in the errback:

@celery.task
def plus(x, y):
    print(f'Running plus {x}, {y}')
    return x + y


@celery.task
def failure():
    print('Running failure')
    raise ValueError('BAD')


@celery.task
def callme(stuff):
    print('Callback')
    print(f'Callback arg: {stuff}')


@celery.task
def on_chord_error(task_id, extra_info):
    print('ON ERROR CALLBACK')
    print(f'Task ID: {task_id}')
    print(f'Extra info: {extra_info}')
    callme.delay(extra_info)


@celery.task
def chord_test():
    tasks = [plus.s(1, 1), plus.s(2, 2), failure.s(), plus.s(3, 3)]
    callback = callme.s().on_error(on_chord_error.s('extra info'))
    chord(tasks)(callback)

Which results in:

Received task: tasks.plus[b0d084a5-0956-4f13-bf0d-580a3e3cd55e]
Running plus 1, 1
Task tasks.plus[b0d084a5-0956-4f13-bf0d-580a3e3cd55e] succeeded in 0.020222999999532476s: 2
Received task:tasks.plus[44a9d306-a0a5-4a7d-b71d-0fef56fe3481]
Running plus 2, 2
Task tasks.plus[44a9d306-a0a5-4a7d-b71d-0fef56fe3481] succeeded in 0.019981499994173646s: 4
Task tasks.chord_test[b6173c52-aa62-4dad-84f2-f3df2e1efcd1] succeeded in 0.45647509998525493s: None
Received task: tasks.failure[3880e8bd-2a09-4735-bb5f-9a49e992dfee]
Running failure
Task tasks.failure[3880e8bd-2a09-4735-bb5f-9a49e992dfee] raised unexpected: ValueError('BAD',)
Received task: tasks.plus[b3290ce9-fc74-45f2-a820-40bd6dea8473]
Running plus 3, 3
Task tasks.plus[b3290ce9-fc74-45f2-a820-40bd6dea8473] succeeded in 0.016270199994323775s: 6
celery.chord_unlock[0f37fa4d-4f12-4c65-9e08-b69f0cf2afd7]  ETA:[2018-09-14 03:08:58.441070+00:00]
Chord 'dadece86-d399-4e64-b63a-f02a2a3de434' raised: ValueError('BAD',)
Traceback (most recent call last):
   File "/home/flask/.local/lib/python3.6/site-packages/celery/app/builtins.py", line 81, in unlock_chord
    ret = j(timeout=3.0, propagate=True)
   File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 739, in join
   interval=interval, no_ack=no_ack, on_interval=on_interval,
   File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 213, in get
   self.maybe_throw(callback=callback)
   File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 329, in maybe_throw
   self.throw(value, self._to_remote_traceback(tb))
   File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 322, in throw
    self.on_ready.throw(*args, **kwargs)
   File "/home/flask/.local/lib/python3.6/site-packages/vine/promises.py", line 217, in throw
    reraise(type(exc), exc, tb)
   File "/home/flask/.local/lib/python3.6/site-packages/vine/five.py", line 179, in reraise
    raise value
ValueError: BAD
Received task: tasks.on_chord_error[cf3056bc-34ea-4681-87e7-cded53acb958]
Task celery.chord_unlock[0f37fa4d-4f12-4c65-9e08-b69f0cf2afd7] succeeded in 0.12482409999938682s: None
ON ERROR CALLBACK
Task ID: fe3dae19-0641-47fa-9c4d-953b868992e7
Extra info: extra info
Received task: tasks.callme[d6dfd6c0-f0d9-474f-9d98-be43e031de69]
Callback
Callback arg: extra info

Upvotes: 5

Related Questions