dynamicmindset
dynamicmindset

Reputation: 61

Python celery - how to wait for all subtasks in chord

I am unit testing celery tasks. I have chain tasks that also have groups, so a chord is resulted.

The test should look like:

I tried the following:

def wait_for_result(result):
    result.get()
    for child in result.children or list():
        if isinstance(child, GroupResult):
           # tried looping over task result in group
           # until tasks are ready, but without success 
           pass
        wait_for_result(child)

This creates a deadlock, chord_unlock being retried forever. I am not interested in task results. How can I wait for all the subtasks to finish?

Upvotes: 6

Views: 16960

Answers (3)

pxeba
pxeba

Reputation: 1776

You can use group to get and wait all responses. However, it may not always be the most suitable solution if we are talking about long tasks. To handle more than one task you can also use chains and chords. This link is very explanatory and helped me with my decision making:

https://sayari3.com/articles/18-chains-groups-and-chords-in-celery/

Upvotes: 0

Nitheesh A S
Nitheesh A S

Reputation: 387

Although this is an old question, I just wanted to share how I got rid of the deadlock issue, just in case it helps somebody.

Like the celery logs says, never use get() inside a task. This indeed will create a deadlock.

I have a similar set of celery tasks which includes chain of group tasks, hence making it a chord. I'm calling these tasks using tornado, by making HTTP request. So what I did was something like this:

@task
def someFunction():
    ....


@task
def someTask():
    ....


@task
def celeryTask():
    groupTask = group([someFunction.s(i) for i in range(10)])

    job = (groupTask| someTask.s())

    return job

When celeryTask() is being called by tornado, the chain will start executing, & the UUID of someTask() will be held in job. It will look something like

AsyncResult: 765b29a8-7873-4b28-b05c-7e19c33e950c

This UUID is returned and the celeryTask() exits before even the chain starts executing(ideally), hence leaving space for another process to run.

I then used the tornado layer to check the status of the task. Details on the tornado layer can be found in this stackoverflow question

Upvotes: 6

atv
atv

Reputation: 2000

Have you tried chord + callback ?

http://docs.celeryproject.org/en/latest/userguide/canvas.html#chords

>>> callback = tsum.s()
>>> header = [add.s(i, i) for i in range(100)]
>>> result = chord(header)(callback)
>>> result.get()
9900

Upvotes: 3

Related Questions