code11
code11

Reputation: 2309

How to have conditional workflow execution in flyte?

I am currently using flyte for a project. In it, I have a number of workflows, A,B,C etc. However, I recently identified a use case that requires me to change which workflow I run first, A1 or A2.

After learning you can't use if statements in a workflow due to

Flytekit does not support Unary expressions or performing truth value testing

I determined that I probably needed to use dynamics since the execution is being determined at runtime.

So I tried something like:

#...Reference tasks are described up here...

@dynamic
def start_node_generator(should_use_a1):
    if should_use_a1:
        node_to_use = create_node(node_a1_reference_task)
        conditional_val=1
        return conditional_val, node_to_use
    else:
        node_to_use = create_node(node_a2_reference_task).with_overrides(name="spam")
        conditional_val=node_to_use.o1
        return conditional_val, node_to_use
        
@workflow
def main_workflow(should_use_a1):
    conditional_val, nodeA = start_node_generator(should_use_a1=should_use_a1)
    nodeB = create_node(nodeB_reference_task, conditional_val=conditional_val)
    nodeC = create_node(nodeC_reference_task)
    nodeA >> nodeB
    nodeB >> nodeC

However, I'm clearly thinking about this slightly wrong. You can't return Nodes out of either @tasks or @dynamics since they're not serializable.

How do you conditionally run A1 or A2 first, then the rest?

Upvotes: 1

Views: 708

Answers (1)

Related Questions