Sanket Sudake
Sanket Sudake

Reputation: 771

Celery Generating group tasks from chain task

I am trying to chain following tasks with celery(v4.0),

task = group([tasks1.s(), task2.s()) | generate_job_requests.s() | execute_job.map() | aggregate_result.s()
result = task.get()

Above part is working fine upto generate_job_requests as chord. But problem starts with execute_job where it gets list of jobs from generate_job_requests, for which I need to create parallel tasks and later on aggregate result of all jobs.

I am trying to validate whether such kind of taskgraph is possible with celery ? Is there any possible alternate workflow to solve problem with such dependency ? Anything I am missing in documentation.

Upvotes: 1

Views: 1683

Answers (1)

Sanket Sudake
Sanket Sudake

Reputation: 771

I used map like functionality with intermediate task creator which acts like chord,

@shared_task(ignore_result=False)
def dmap(it, callback, end_task):
    callback = subtask(callback)
    grp = group(callback.clone([arg, ]) for arg in it)
    c = (grp | end_task)
    return c()

So task flow was reduced as this,

task = (group([tasks1.s(), task2.s()) | generate_job_requests.s() | dmap.s(
        execute_job.s(), aggregate_result.s())).apply_async()

For getting ultimate output of task, I did few tweaks,

# As we get dmap task id here
dmap_task = celery_app.AsyncResult(task.id)
dmap_result = dmap_task.get()
# Get actual aggregate_result task id
aggr_res_task_id = dmap_result[0][0]
result = celery_app.AsyncResult(aggr_res_task_id)
# Here we receive actual output of overall task
result.get()

I referred answer

Upvotes: 0

Related Questions