Gh0sT
Gh0sT

Reputation: 317

Celery apply_async pass kwargs to all tasks in chain

A Celery task queue to calculate result of (2 + 2) - 3.

@app.task()
def add(**kwargs):
    time.sleep(5)
    x, y = kwargs['add'][0], kwargs['add'][1]
    return x + y

@app.task()
def sub(**kwargs):
    time.sleep(5)
    x = args[0]
    y = kwargs['sub'][0]
    return x - y

Sample task data = kwargs = {'add' : (2, 2), 'sub' : (3)}
Chaining the tasks: result = (add.s() | sub.s()).apply_async(kwargs = kwargs)

As per design, apply_async only applies the kwargs to the first task in the chain. What do I need to change to achieve the desired outcome?

Upvotes: 3

Views: 4543

Answers (2)

Gh0sT
Gh0sT

Reputation: 317

So as of Celery v4.4.0rc4 there is no better way to do this other than passing kwargs to each task's signature. Although it does look like Ask Solem (Celery dev) is open to a feature request..

This is how the chain should look like:

result = (add.s(job_data = job_data)| sub.s(job_data = job_data)).apply_async()

However since our chains have 10+ tasks, I had to come up with an easier way to write this.

# Workflow generator
def workflow_generator(task_list, job_data):
    _tasks = tuple(getattr(task, 's')(job_data = job_data) for task in task_list)
    return chain(*_tasks).apply_async()

taskList = [add, sub]
job_data = {'add' : (2, 2), 'sub' : (3)}
result = workflow_generator(taskList, job_data) 

Upvotes: 3

2ps
2ps

Reputation: 15946

Why not just partially bind the tasks before chaining them?

result = (add.s(add=(2,2)) | sub.s(sub=(3,3))).apply_async()

Upvotes: 0

Related Questions