so12311
so12311

Reputation: 4229

Schedule Celery task to run after other task(s) complete

I want to accomplish something like this:

results = []
for i in range(N):
    data = generate_data_slowly()
    res = tasks.process_data.apply_async(data)
    results.append(res)
celery.collect(results).then(tasks.combine_processed_data())

ie launch asynchronous tasks over a long period of time, then schedule a dependent task that will only be executed once all earlier tasks are complete.

I've looked at things like chain and chord, but it seems like they only work if you can construct your task graph completely upfront.

Upvotes: 4

Views: 2264

Answers (2)

so12311
so12311

Reputation: 4229

For anyone interested, I ended up using this snippet:

@app.task(bind=True, max_retries=None)
def wait_for(self, task_id_or_ids):
    try:
        ready = app.AsyncResult(task_id_or_ids).ready()
    except TypeError:
        ready = all(app.AsyncResult(task_id).ready()
                    for task_id in task_id_or_ids)

    if not ready:
        self.retry(countdown=2**self.request.retries)

And writing the workflow something like this:

task_ids = []
for i in range(N):
    task = (generate_data_slowly.si(i) | 
            process_data.si(i)
            )
    task_id = task.delay().task_id
    task_ids.append(task_id)

final_task = (wait_for(task_ids) |
        combine_processed_data.si()
        )

final_task.delay()

Upvotes: 3

Nuno André
Nuno André

Reputation: 5349

That way you would be running your tasks synchronously.

The solution depends entirely on how and where data are collected. Roughly, given that generate_data_slowly and tasks.process_data are synchronized, a better approach would be to join both in one task (or a chain) and to group them.

chord will allow you to add a callback to that group.

The simplest example would be:

from celery import chord

@app.task
def getnprocess_data():
    data = generate_data_slowly()
    return whatever_process_data_does(data)

header = [getnprocess_data.s() for i in range(N)]
callback = combine_processed_data.s()

chord(header)(callback).get()

Upvotes: 1

Related Questions