Reputation: 6572
This question is the same as this one: How to chain a Celery task that returns a list into a group? except that I need this to happen in the middle of a chain, and the accepted solution only works if the intermediate task is the final "link" in the chain.
Here is the same example slightly modified that reproduces the issue:
from random import random
from celery import
@app.task
def get_list(amount):
return [i for i in range(amount)]
@app.task
def process_item(item):
return [f'id-{item}', random() > .5]
@app.task
def dmap(it, callback):
# Map a callback over an iterator and return as a group
callback = subtask(callback)
return group(callback.clone([arg,]) for arg in it)()
@app.task
def handle_results(results):
for result in results:
if result[1] == None:
continue
return result[1] # return the first True value
def foo():
return chain(
get_list.s(10),
dmap.s(process_item.s()),
handle_results.s() # <-- if I add this, it fails
)
# in a terminal, or somewhere
foo()()
The error I'm getting is this:
File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/json/encoder.py", line 179, in default raise TypeError(f'Object of type {o.class.name} ' kombu.exceptions.EncodeError: Object of type GroupResult is not JSON serializable
That is the return value of dmap
after all.. and no it can't be serialized.. but note that if I did this:
>>> lst = [i for i in range(amount)]
>>> chain(group(process_item.s(i) for i in lst), handle_results.s())
then that would work. I'm confused on what actually needs to be passed from one member of the chain to the other.. as the result of group(...)
is:
>>> from app.manager_tasks import process_item
>>> group(process_item.s(e) for e in [1, 2, 3, 4])
group([app.manager_tasks.process_item(1), process_item(2), process_item(3), process_item(4)])
>>> group(process_item.s(e) for e in [1, 2, 3, 4]).delay()
<GroupResult: 07c9be1a-b3e3-4da2-af54-7177f3d91d0f [cf777f54-4763-46bd-a405-2c1993ddbf66, 103298fc-8f1f-4183-ba45-670224fcd319, 3ad87c2c-7b64-4309-a61b-e53ae17302b9, bf2766a3-662a-415d-a35b-037a0476f4a4]>
which is a GroupResult
itself (with delay called), otherwise just a group. Since the dmap
is a signature itself, I'm guessing that's why delay()
needs to be called inside of it for chain
.. 🤔
If I invoke the result as done in the other stackoverflow (same link as first) examples I'm left with a GroupResult
, which only succeeds if it's the last member of the chain (()
, .delay()
, .apply_async()
). If I call .get()
on the GroupResult
to get something serializable, then I get the following error: RuntimeError: Never call result.get() within a task!
Which presents me with a conundrum; how can I accomplish this?
Pretty stumped on this one.. but I'm also new to celery. Really appreciate any advice on how I could/should solve this!
A bit more background, I intend to use this chain in repeat as part of another chain which sits at the top level specifying stages in a pipeline.
Upvotes: 3
Views: 2241
Reputation: 1
I had a similar problem. (celery 5.4.0)
<img src="https://i.sstatic.net/CK4HVnrk.png" alt="Dynamic Chord inside Chain">
Task_1 finds out how many chains the following chord has. Then the chord should be defined dynamically. Designing Dynamic Workflows with Celery and Python
I solve it by adding a task chord_creator to the main chain.
@celery.shared_task(bind=True)
def chord_creator(self,list_, task_2, task_3, agrup_result):
def get_chain(element):
subtask_2=celery.subtask(task_2).clone([element,])
subtask_3=celery.subtask(task_3)
mychain = celery.chain(subtask_2,subtask_3)
return mychain
subtask_agrup=celery.subtask(agrup_result)
mychord= celery.chord([get_chain(element=arg) for arg in list_],subtask_agrup)
return self.replace(mychord)
bind=True allows acess self inside the task.
self.replace add the newly created chore next to the current task.
Task signatures that arrive through task parameters have become dict. Then reconverting to subtask is needed.
work_flow= celery.chain(
[
task_1.s(),
task_2.s(),
chord_creator.s(task_2=task_2.s(), task_3=task_3.s(), agrup_result= agrup_result.s())
]
)
Upvotes: 0
Reputation: 28
As I could not get the accepted answer to work, here is another take on this, where the chord is dealt with inside the dmap function.
Here is a reproducible example :
from celery import Celery, subtask, group
app = Celery('tasks', backend='redis://', broker='redis://')
@app.task
def get_list(n):
return [i for i in range(n)]
@app.task
def process_item(item):
return f'id-{item}'
@app.task
def handle_results(results):
return ' - '.join(results)
@app.task
def dmap(it, callback, chord_callback):
# Map a callback over an iterator and return as a group
callback = subtask(callback)
chord_callback = subtask(chord_callback)
final_res = (group(callback.clone((arg,)) for arg in it) |
chord_callback)()
return final_res
if __name__ == "__main__":
pipeline = (get_list.s(10) |
dmap.s(process_item.s(), handle_results.s()))()
task_ids = pipeline.get()
chord_task_id = task_ids[0][0]
print(app.AsyncResult(chord_task_id).get())
# id-0 - id-1 - id-2 - id-3 - id-4 - id-5 - id-6 - id-7 - id-8 - id-9
What happens here is:
get_list
task produces a 0:9 range, which is chained with the dmap
task.dmap
task deserializes two callback signatures, and runs a chord task asynchronously where the first callback (here, transforming to an "id-." label) is applied on each element of the list, and the second one is subsequently applied on the list of results (here joining the labels to a single string). (The chord is implicit here as "Chaining a group together with another task will automatically upgrade it to be a chord", from the celery docs)get
from the chord idTested on celery v5.2.7
Upvotes: 0
Reputation: 6572
As @DejanLekic mentioned, I should have been using a chord
. This would solve the above:
def foo():
return chord(
get_list.s(10),
dmap.s(process_item.s())
)(handle_results.s())
I had wanted this to still yet be part of a chain
, but it doesn't look like that is supported right now.
The below is less related to the question, though possibly useful to some.
Using the solution from that github issue thread, I can still do what I need (after the primary question was figured out) by having nested chords and chains. Not the cleanest, but it works.. would look like this:
def foo():
return chord(
get_list.s(10),
dmap.s(process_item.s())
)(chain(handle_results.s(), log_stuff.s()))
Upvotes: 1