Thrastylon
Thrastylon

Reputation: 980

Use multiple celery results as arguments for a subsequent task

I'm currently going through the Celery user guide which has the following example:

# ((4 + 16) * 2 + 4) * 8
>>> c2 = (add.s(4, 16) | mul.s(2) | (add.s(4) | mul.s(8)))
>>> res = c2()
>>> res.get()
352

But this example only uses one partial result at a time. My question is whether it is possible to use several, with the same primitive tasks (add and mul), e.g.:

# ((4 + 16) * (2 + 4)) * 8
>>> c2 = ???
>>> res = c2()
>>> res.get()
960

Below are my tries so far:

# TypeError: mul() missing 1 required positional argument: 'y'
>>> c2 = (add.s(4, 16) | (add.si(2, 4) | mul.s()) | mul.s(8))

# Works, but needs to implement a new "prod" task
>>> c2 = (group(add.s(4, 16), add.s(2, 4)) | prod.s() | mul.s(8))

Where prod is simply:

@app.task
def prod(xs):
    return functools.reduce((lambda x, y: x * y), xs)

Upvotes: 3

Views: 976

Answers (1)

Niel Godfrey P. Ponciano
Niel Godfrey P. Ponciano

Reputation: 10709

You can achieve multiple celery results from multiple tasks at once by using groups. However as you already found out, it returns a single object only which is a sequence e.g. [20, 6]. Since it is only a single object (which is a sequence containing 2 items), thus you have to unwrap the sequence first before you can map it to the existing definitions of add or mul as what you did with your new prod task.

An alternative I can think of without using the intermediate prod task is by separating them into 2 chains.

>>> c1 = group(add.s(4, 16), add.s(2, 4))  # 4+16, 2+4
>>> c2 = mul.s(*c1().get()) | mul.s(8)  # (20 * 6) * 8
>>> c2().get()
960

Or you can choose to just update the implementation of mul to instead accept *args and handle the scenario of passing in a sequence e.g. list so that the call can either be mul(20, 6) or mul([20, 6]), allowing you to directly perform (group(add.s(4, 16), add.s(2, 4)) | mul.s() | mul.s(8)).

@app.task
def mul(*args):
    if len(args) == 1 and isinstance(args[0], collections.abc.Sequence):
        args = args[0]
    return functools.reduce((lambda x, y: x * y), args)

Upvotes: 1

Related Questions