Reputation: 742
What I am trying to do is execute a group of tasks and have the result of each task in the group sent to another task and then the results of all those tasks sent to one final task.
e.g.
jobgroup = (
group(tasks.task1.s(p) for p in params) |
tasks.dmap.s(tasks.task2.s())
)
That part works OK for me but the problem I am having is if I am trying to get all the results into one final task e.g.
mychain = chain(jobgroup | tasks.task3.s())
What I am seeing is that task3() is being called while tasks from task2 are still in the pending status by printing out the status as they come in (the following snippet is what I am print from within my task3 function:
@task()
def task3(input):
for item in input.results:
logger.info(item.status)
Log results
[2014-09-04 10:48:41,905: INFO/Worker-7] tasks.task3[27053688-3c5c-4ca5-975f-356a66d55364]: PENDING
[2014-09-04 10:48:41,905: INFO/Worker-7] tasks.task3[27053688-3c5c-4ca5-975f-356a66d55364]: PENDING
So how do I set this up so that task3 is not called until all the tasks in jobgroup are complete?
Upvotes: 2
Views: 1321
Reputation: 29514
A chord consists of a header and a body. The header is a group of tasks that MUST COMPLETE before the callback is called. A chord is essentially a callback for a group of tasks.
Example:
>>> from celery import chord
>>> res = chord([add.s(2, 2), add.s(4, 4)])(sum_task.s())
You can use chord
to accomplish the task.
from celery import chord, group, chain
task_1 = group(tasks.task1.s(p) for p in params)
task_2 = group(tasks.dmap.s(tasks.task2.s())
task_3 = tasks.task3.s()
workflow = chord(chain([task_1, task_2]))(task_3)
Upvotes: 1