Reputation: 61
I am unit testing celery tasks. I have chain tasks that also have groups, so a chord is resulted.
The test should look like:
I tried the following:
def wait_for_result(result):
result.get()
for child in result.children or list():
if isinstance(child, GroupResult):
# tried looping over task result in group
# until tasks are ready, but without success
pass
wait_for_result(child)
This creates a deadlock, chord_unlock being retried forever. I am not interested in task results. How can I wait for all the subtasks to finish?
Upvotes: 6
Views: 16960
Reputation: 1776
You can use group to get and wait all responses. However, it may not always be the most suitable solution if we are talking about long tasks. To handle more than one task you can also use chains and chords. This link is very explanatory and helped me with my decision making:
https://sayari3.com/articles/18-chains-groups-and-chords-in-celery/
Upvotes: 0
Reputation: 387
Although this is an old question, I just wanted to share how I got rid of the deadlock issue, just in case it helps somebody.
Like the celery logs says, never use get()
inside a task. This indeed will create a deadlock.
I have a similar set of celery tasks which includes chain of group tasks, hence making it a chord. I'm calling these tasks using tornado, by making HTTP request. So what I did was something like this:
@task
def someFunction():
....
@task
def someTask():
....
@task
def celeryTask():
groupTask = group([someFunction.s(i) for i in range(10)])
job = (groupTask| someTask.s())
return job
When celeryTask()
is being called by tornado, the chain will start executing, & the UUID of someTask()
will be held in job
. It will look something like
AsyncResult: 765b29a8-7873-4b28-b05c-7e19c33e950c
This UUID is returned and the celeryTask()
exits before even the chain starts executing(ideally), hence leaving space for another process to run.
I then used the tornado layer to check the status of the task. Details on the tornado layer can be found in this stackoverflow question
Upvotes: 6
Reputation: 2000
Have you tried chord + callback ?
http://docs.celeryproject.org/en/latest/userguide/canvas.html#chords
>>> callback = tsum.s()
>>> header = [add.s(i, i) for i in range(100)]
>>> result = chord(header)(callback)
>>> result.get()
9900
Upvotes: 3