Reputation: 2309
I am currently using flyte for a project. In it, I have a number of workflows, A,B,C etc. However, I recently identified a use case that requires me to change which workflow I run first, A1 or A2.
After learning you can't use if statements in a workflow due to
Flytekit does not support Unary expressions or performing truth value testing
I determined that I probably needed to use dynamics since the execution is being determined at runtime.
So I tried something like:
#...Reference tasks are described up here...
@dynamic
def start_node_generator(should_use_a1):
if should_use_a1:
node_to_use = create_node(node_a1_reference_task)
conditional_val=1
return conditional_val, node_to_use
else:
node_to_use = create_node(node_a2_reference_task).with_overrides(name="spam")
conditional_val=node_to_use.o1
return conditional_val, node_to_use
@workflow
def main_workflow(should_use_a1):
conditional_val, nodeA = start_node_generator(should_use_a1=should_use_a1)
nodeB = create_node(nodeB_reference_task, conditional_val=conditional_val)
nodeC = create_node(nodeC_reference_task)
nodeA >> nodeB
nodeB >> nodeC
However, I'm clearly thinking about this slightly wrong. You can't return Nodes out of either @tasks or @dynamics since they're not serializable.
How do you conditionally run A1 or A2 first, then the rest?
Upvotes: 1
Views: 708