Gaëtan
Gaëtan

Reputation: 863

Looping tasks in Prefect

I want to loop over tasks, again and again, until reaching a certain condition before continuing the rest of the workflow.

What I have so far is this:

# Loop task
class MyLoop(Task):
    def run(self):
        loop_res = prefect.context.get("task_loop_result", 1)
        print (loop_res)
        if loop_res >= 10:
            return loop_res
        raise LOOP(result=loop_res+1)

But as far as I understand this does not work for multiple tasks. Is there a way to come back further and loop on several tasks at a time ?

Upvotes: 6

Views: 3271

Answers (2)

Gaëtan
Gaëtan

Reputation: 863

The solution is simply to create a single task that itself creates a new flow with one or more parameters and calls flow.run(). For example:

class MultipleTaskLoop(Task):
    def run(self):
        # Get previous value
        loop_res = prefect.context.get("task_loop_result", 1)
        
        # Create subflow
        with Flow('Subflow', executor=LocalDaskExecutor()) as flow:
            x = Parameter('x', default = 1)
            loop1 = print_loop()
            add = add_value(x)
            loop2 = print_loop()
            loop1.set_downstream(add)
            add.set_downstream(loop2)

        # Run subflow and extract result
        subflow_res = flow.run(parameters={'x': loop_res})
        new_res = subflow_res.result[add]._result.value

        # Loop
        if new_res >= 10:
            return new_res
        raise LOOP(result=new_res)

where print_loop simply prints "loop" in the output and add_value adds one to the value it receives.

Upvotes: 5

Clay Shwery
Clay Shwery

Reputation: 380

Unless I'm missing something, the answer is no.

Prefect flows are DAGs, and what you are describing (looping over multiple tasks in order again and again until some condition is met) would make a cycle, so you can't do it.

This may or may not be helpful, but you could try and make all of the tasks you want to loop into one task, and loop within that task until your exit condition has been met.

Upvotes: 2

Related Questions