Reputation: 81
I want to have a downstream component that has an optional dsl.Dataset
input. That optional input is the output of an upstream task within a dsl.If
. All of these are within a dsl.ParallelFor
, so that I can consume everything with a dsl.Collected
afterwards. Something like:
from typing import Optional
from kfp import dsl
@dsl.component
def component_A_op() -> dsl.Dataset:
return Dataset()
@dsl.component
def component_B_op(a_data: Optional[Dataset] = None) -> Dataset:
if a_data:
return Dataset()
else:
return Dataset()
@dsl.component
def component_C_op(datas: List[Dataset]):
pass
@dsl.pipeline()
def pipeline:
with dsl.ParallelFor(items=["item_1", "item_2"]) as _items:
with dsl.If(_items == "item_1"):
component_A = component_A_op()
component_B = component_B_op(
a_data= # if component_A is present then component_A.output else None
)
component_C = component_C_op(datas=dsl.Collected(component_B)
Is this possible?
Upvotes: 0
Views: 12