Reputation: 863
I want to loop over tasks, again and again, until reaching a certain condition before continuing the rest of the workflow.
What I have so far is this:
# Loop task
class MyLoop(Task):
def run(self):
loop_res = prefect.context.get("task_loop_result", 1)
print (loop_res)
if loop_res >= 10:
return loop_res
raise LOOP(result=loop_res+1)
But as far as I understand this does not work for multiple tasks. Is there a way to come back further and loop on several tasks at a time ?
Upvotes: 6
Views: 3271
Reputation: 863
The solution is simply to create a single task that itself creates a new flow with one or more parameters and calls flow.run(). For example:
class MultipleTaskLoop(Task):
def run(self):
# Get previous value
loop_res = prefect.context.get("task_loop_result", 1)
# Create subflow
with Flow('Subflow', executor=LocalDaskExecutor()) as flow:
x = Parameter('x', default = 1)
loop1 = print_loop()
add = add_value(x)
loop2 = print_loop()
loop1.set_downstream(add)
add.set_downstream(loop2)
# Run subflow and extract result
subflow_res = flow.run(parameters={'x': loop_res})
new_res = subflow_res.result[add]._result.value
# Loop
if new_res >= 10:
return new_res
raise LOOP(result=new_res)
where print_loop
simply prints "loop" in the output and add_value
adds one to the value it receives.
Upvotes: 5
Reputation: 380
Unless I'm missing something, the answer is no.
Prefect flows are DAGs, and what you are describing (looping over multiple tasks in order again and again until some condition is met) would make a cycle, so you can't do it.
This may or may not be helpful, but you could try and make all of the tasks you want to loop into one task, and loop within that task until your exit condition has been met.
Upvotes: 2