Reputation: 317
A Celery task queue to calculate result of (2 + 2) - 3.
@app.task()
def add(**kwargs):
time.sleep(5)
x, y = kwargs['add'][0], kwargs['add'][1]
return x + y
@app.task()
def sub(**kwargs):
time.sleep(5)
x = args[0]
y = kwargs['sub'][0]
return x - y
Sample task data = kwargs = {'add' : (2, 2), 'sub' : (3)}
Chaining the tasks: result = (add.s() | sub.s()).apply_async(kwargs = kwargs)
As per design, apply_async only applies the kwargs to the first task in the chain. What do I need to change to achieve the desired outcome?
Upvotes: 3
Views: 4543
Reputation: 317
So as of Celery v4.4.0rc4 there is no better way to do this other than passing kwargs to each task's signature. Although it does look like Ask Solem (Celery dev) is open to a feature request..
This is how the chain should look like:
result = (add.s(job_data = job_data)| sub.s(job_data = job_data)).apply_async()
However since our chains have 10+ tasks, I had to come up with an easier way to write this.
# Workflow generator
def workflow_generator(task_list, job_data):
_tasks = tuple(getattr(task, 's')(job_data = job_data) for task in task_list)
return chain(*_tasks).apply_async()
taskList = [add, sub]
job_data = {'add' : (2, 2), 'sub' : (3)}
result = workflow_generator(taskList, job_data)
Upvotes: 3
Reputation: 15946
Why not just partially bind the tasks before chaining them?
result = (add.s(add=(2,2)) | sub.s(sub=(3,3))).apply_async()
Upvotes: 0