Andrew
Andrew

Reputation: 742

Celery chaining executing subtask before group tasks completed

What I am trying to do is execute a group of tasks and have the result of each task in the group sent to another task and then the results of all those tasks sent to one final task.

e.g.

jobgroup = (
           group(tasks.task1.s(p) for p in params) |
           tasks.dmap.s(tasks.task2.s())
          )

That part works OK for me but the problem I am having is if I am trying to get all the results into one final task e.g.

mychain = chain(jobgroup | tasks.task3.s())

What I am seeing is that task3() is being called while tasks from task2 are still in the pending status by printing out the status as they come in (the following snippet is what I am print from within my task3 function:

@task()
def task3(input):
   for item in input.results:
      logger.info(item.status)

Log results

[2014-09-04 10:48:41,905: INFO/Worker-7] tasks.task3[27053688-3c5c-4ca5-975f-356a66d55364]: PENDING
[2014-09-04 10:48:41,905: INFO/Worker-7] tasks.task3[27053688-3c5c-4ca5-975f-356a66d55364]: PENDING

So how do I set this up so that task3 is not called until all the tasks in jobgroup are complete?

Upvotes: 2

Views: 1321

Answers (1)

Chillar Anand
Chillar Anand

Reputation: 29514

Chord:

A chord consists of a header and a body. The header is a group of tasks that MUST COMPLETE before the callback is called. A chord is essentially a callback for a group of tasks.

Example:

>>> from celery import chord
>>> res = chord([add.s(2, 2), add.s(4, 4)])(sum_task.s())

You can use chord to accomplish the task.

from celery import chord, group, chain

task_1 = group(tasks.task1.s(p) for p in params)
task_2 = group(tasks.dmap.s(tasks.task2.s())
task_3 = tasks.task3.s()

workflow = chord(chain([task_1, task_2]))(task_3)

Upvotes: 1

Related Questions