Reputation: 4229
I want to accomplish something like this:
results = []
for i in range(N):
data = generate_data_slowly()
res = tasks.process_data.apply_async(data)
results.append(res)
celery.collect(results).then(tasks.combine_processed_data())
ie launch asynchronous tasks over a long period of time, then schedule a dependent task that will only be executed once all earlier tasks are complete.
I've looked at things like chain
and chord
, but it seems like they only work if you can construct your task graph completely upfront.
Upvotes: 4
Views: 2264
Reputation: 4229
For anyone interested, I ended up using this snippet:
@app.task(bind=True, max_retries=None)
def wait_for(self, task_id_or_ids):
try:
ready = app.AsyncResult(task_id_or_ids).ready()
except TypeError:
ready = all(app.AsyncResult(task_id).ready()
for task_id in task_id_or_ids)
if not ready:
self.retry(countdown=2**self.request.retries)
And writing the workflow something like this:
task_ids = []
for i in range(N):
task = (generate_data_slowly.si(i) |
process_data.si(i)
)
task_id = task.delay().task_id
task_ids.append(task_id)
final_task = (wait_for(task_ids) |
combine_processed_data.si()
)
final_task.delay()
Upvotes: 3
Reputation: 5349
That way you would be running your tasks synchronously.
The solution depends entirely on how and where data
are collected. Roughly, given that generate_data_slowly
and tasks.process_data
are synchronized, a better approach would be to join both in one task
(or a chain
) and to group
them.
chord
will allow you to add a callback to that group
.
The simplest example would be:
from celery import chord
@app.task
def getnprocess_data():
data = generate_data_slowly()
return whatever_process_data_does(data)
header = [getnprocess_data.s() for i in range(N)]
callback = combine_processed_data.s()
chord(header)(callback).get()
Upvotes: 1