Reputation: 594
I am using a Chord in Celery to have a callback that gets called when a Group of parallel tasks finish executing. Specifically, I have a group of functions that wrap calls to an external API. I want to wait for all of these to return before I process the results and update my database in the Chord callback. I would like the callback to execute when all of the API calls have finished, regardless of their status.
My problem is that the callback function only gets called if none of the group's subtasks raise an exception. If, however, one subtask raises an exception then an optional error handler on_error()
gets called with a string representation of the task_id
of the chord. The remaining tasks in the group do continue execution but the callback is never called.
I'll illustrate this with an example below:
@app.task
def maybe_succeed():
divisor = randint(0, 10)
return 1 / divisor
@app.task
def master_task():
g = group([maybe_succeed.s() for i in range(100)])
c = g | chord_callback.s()
return c.delay()
@app.task
def chord_callback(results):
print 'Made it here!'
In the above example, calling master_task()
will run all of the tasks in the group, however, the callback will never get called because one of the maybe_succeed()
will fail (unless you're super lucky!).
Right now, I'm dealing with this problem by catching all exceptions in my equivalent of maybe_succeed()
so that the chord will never fail. I guess this is a fine solution though it doesn't feel right.
So, my question is: Is there a way to have a Celery Chord callback execute regardless of the return status of its group's subtasks?
Upvotes: 7
Views: 5430
Reputation: 9624
I stumbled upon this problem recently and the only solution for now is to monkey patch
the result backend (In my Case Redis) to stop the exception from being reraised.
import celery
from celery import Celery, group, states
from celery.backends.redis import RedisBackend
def patch_celery():
"""Patch the redis backend."""
def _unpack_chord_result(
self, tup, decode,
EXCEPTION_STATES=states.EXCEPTION_STATES,
PROPAGATE_STATES=states.PROPAGATE_STATES,
):
_, tid, state, retval = decode(tup)
if state in EXCEPTION_STATES:
retval = self.exception_to_python(retval)
if state in PROPAGATE_STATES:
# retval is an Exception
return '{}: {}'.format(retval.__class__.__name__, str(retval))
return retval
celery.backends.redis.RedisBackend._unpack_chord_result = _unpack_chord_result
return celery
Now you can call patch_celery().Celery
to access the patched version. I wrote an article that explains everything.
Upvotes: 6
Reputation: 1014
You could try calling the original callback in the errback:
@celery.task
def plus(x, y):
print(f'Running plus {x}, {y}')
return x + y
@celery.task
def failure():
print('Running failure')
raise ValueError('BAD')
@celery.task
def callme(stuff):
print('Callback')
print(f'Callback arg: {stuff}')
@celery.task
def on_chord_error(task_id, extra_info):
print('ON ERROR CALLBACK')
print(f'Task ID: {task_id}')
print(f'Extra info: {extra_info}')
callme.delay(extra_info)
@celery.task
def chord_test():
tasks = [plus.s(1, 1), plus.s(2, 2), failure.s(), plus.s(3, 3)]
callback = callme.s().on_error(on_chord_error.s('extra info'))
chord(tasks)(callback)
Which results in:
Received task: tasks.plus[b0d084a5-0956-4f13-bf0d-580a3e3cd55e]
Running plus 1, 1
Task tasks.plus[b0d084a5-0956-4f13-bf0d-580a3e3cd55e] succeeded in 0.020222999999532476s: 2
Received task:tasks.plus[44a9d306-a0a5-4a7d-b71d-0fef56fe3481]
Running plus 2, 2
Task tasks.plus[44a9d306-a0a5-4a7d-b71d-0fef56fe3481] succeeded in 0.019981499994173646s: 4
Task tasks.chord_test[b6173c52-aa62-4dad-84f2-f3df2e1efcd1] succeeded in 0.45647509998525493s: None
Received task: tasks.failure[3880e8bd-2a09-4735-bb5f-9a49e992dfee]
Running failure
Task tasks.failure[3880e8bd-2a09-4735-bb5f-9a49e992dfee] raised unexpected: ValueError('BAD',)
Received task: tasks.plus[b3290ce9-fc74-45f2-a820-40bd6dea8473]
Running plus 3, 3
Task tasks.plus[b3290ce9-fc74-45f2-a820-40bd6dea8473] succeeded in 0.016270199994323775s: 6
celery.chord_unlock[0f37fa4d-4f12-4c65-9e08-b69f0cf2afd7] ETA:[2018-09-14 03:08:58.441070+00:00]
Chord 'dadece86-d399-4e64-b63a-f02a2a3de434' raised: ValueError('BAD',)
Traceback (most recent call last):
File "/home/flask/.local/lib/python3.6/site-packages/celery/app/builtins.py", line 81, in unlock_chord
ret = j(timeout=3.0, propagate=True)
File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 739, in join
interval=interval, no_ack=no_ack, on_interval=on_interval,
File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 213, in get
self.maybe_throw(callback=callback)
File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 329, in maybe_throw
self.throw(value, self._to_remote_traceback(tb))
File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 322, in throw
self.on_ready.throw(*args, **kwargs)
File "/home/flask/.local/lib/python3.6/site-packages/vine/promises.py", line 217, in throw
reraise(type(exc), exc, tb)
File "/home/flask/.local/lib/python3.6/site-packages/vine/five.py", line 179, in reraise
raise value
ValueError: BAD
Received task: tasks.on_chord_error[cf3056bc-34ea-4681-87e7-cded53acb958]
Task celery.chord_unlock[0f37fa4d-4f12-4c65-9e08-b69f0cf2afd7] succeeded in 0.12482409999938682s: None
ON ERROR CALLBACK
Task ID: fe3dae19-0641-47fa-9c4d-953b868992e7
Extra info: extra info
Received task: tasks.callme[d6dfd6c0-f0d9-474f-9d98-be43e031de69]
Callback
Callback arg: extra info
Upvotes: 5