avaleske
avaleske

Reputation: 1853

Celery task that returns group in chain

I'm trying to setup a celery task as part of a Django site that gets called by celerybeat that runs several steps of a processing pipeline.

The first step downloads data files, the second plots the data in those files, and third chops those plots into tiles for google maps.

The first step is just one task, but the second and third steps are groups of tasks, since there are dozens of plots made from each data file.

What I wanted to do was to have celerybeat call a task like

@shared_task
def pipeline():
    job = chain(download.s(), get_plot_task_group().s(), spacer.s(), tile.s())
    result = job.apply_async()
    return result

where the spacer is necessary for celery to wait for the plotting group to finish all the tasks in the group before tiling, as outlined here: celery - chaining groups and subtasks. -> out of order execution

The problem, though, is that get_plot_task_group() needs information about the data files to build the group of tasks, but celery executes it immediately, instead of after download.s() finishes. Is there a way to tell celery to wait to call that function?

I also tried making get_plot_task_group itself a task that returns a group, but I have no way to call that group either in the chain or after the chain finishes. I'd have to call result.get() to get the group object out, and calling result.get() inside a task is bad practice. I can't see a way to use callbacks to do it either.

I'd appreciate any insight you have.

tl;dr: I have to create a group of tasks inside another task, and then run it, but I'm not sure how.

Upvotes: 1

Views: 3211

Answers (1)

Chillar Anand
Chillar Anand

Reputation: 29534

Convert your group task to chord to achieve that. Here is a simple example.

@app.task
def dummy_task():
    return 'I am a dummy task'

@app.task
def add(x, y):
    return x + y

@app.task
def task_with_group_task():
    task1 = add.si(1, 2)
    group_task = group(add.si(i, i) for i in range(5))
    task2 = chord(group_task, dummy_task.si())
    task3 = add.si(9, 9)

    pipeline = chain(task1, task2, task3)()

Here task2 starts only after completion of task1 and task3 starts only after completion of task2.

Note: I am not passing result of one task to another.

Upvotes: 4

Related Questions