Reputation: 41
I’m trying to run a Vertex AI Pipelines job where I skip a certain pipeline step if the value of a certain pipeline parameter (in this case do_task1
) is False
. But because there is another step that runs unconditionally and expects the output of the first potentially skipped step, I get the following error, independently of do_task1 being True
or False
:
AssertionError: component_input_artifact: pipelineparam--task1-output_path not found. All inputs: parameters {
key: "do_task1"
value {
type: STRING
}
}
parameters {
key: "task1_name"
value {
type: STRING
}
}
It seems like the compiler just cannot find the output output_path
from task1
. So I wonder if there is any way to have some sort of placeholders for the outputs of those steps that are under a dsl.Condition
, and thus they get filled with default values unless the actual steps run and fill them with the non-default values.
The code below represents the problem and is easily reproducible.
I'm using google-cloud-aiplatform==1.14.0
and kfp==1.8.11
from typing import NamedTuple
from kfp import dsl
from kfp.v2.dsl import Dataset, Input, OutputPath, component
from kfp.v2 import compiler
from google.cloud.aiplatform import pipeline_jobs
@component(
base_image="python:3.9",
packages_to_install=["pandas"]
)
def task1(
# inputs
task1_name: str,
# outputs
output_path: OutputPath("Dataset"),
) -> NamedTuple("Outputs", [("output_1", str), ("output_2", int)]):
import pandas as pd
output_1 = task1_name + "-processed"
output_2 = 2
df_output_1 = pd.DataFrame({"output_1": [output_1]})
df_output_1.to_csv(output_path, index=False)
return (output_1, output_2)
@component(
base_image="python:3.9",
packages_to_install=["pandas"]
)
def task2(
# inputs
task1_output: Input[Dataset],
) -> str:
import pandas as pd
task1_input = pd.read_csv(task1_output.path).values[0][0]
return task1_input
@dsl.pipeline(
pipeline_root='pipeline_root',
name='pipelinename',
)
def pipeline(
do_task1: bool,
task1_name: str,
):
with dsl.Condition(do_task1 == True):
task1_op = (
task1(
task1_name=task1_name,
)
)
task2_op = (
task2(
task1_output=task1_op.outputs["output_path"],
)
)
if __name__ == '__main__':
do_task1 = True # <------------ The variable to modify ---------------
# compile pipeline
compiler.Compiler().compile(
pipeline_func=pipeline, package_path='pipeline.json')
# create pipeline run
pipeline_run = pipeline_jobs.PipelineJob(
display_name='pipeline-display-name',
pipeline_root='pipelineroot',
job_id='pipeline-job-id',
template_path='pipelinename.json',
parameter_values={
'do_task1': do_task1, # pipeline compilation fails with either True or False values
'task1_name': 'Task 1',
},
enable_caching=False
)
# execute pipeline run
pipeline_run.run()
Any help is much appreciated!
Upvotes: 1
Views: 1264
Reputation: 8044
I found a workaround by introducing a dummy_task and an else block.
In your case you would also need to use dsl.OneOf
to combine the outputs.
@dsl.component(base_image="python:3.11")
def dummy(a: str) -> str:
return "ok"
with dsl.Condition(do_task1 == True):
task1_op = (
task1(
task1_name=task1_name,
)
)
with dsl.Else:
dummy_op = dummy(a="dummy")
cond_ouput = dsl.OneOf(task1_op.outputs["output_path"], dummy_op.output)
task2_op = task_2(task1_output=cond_ouput)
In my case, I did not need the input of the conditional task. Task2 just had to run after task1 if task1 is run, otherwise task2 should run after the non coditional task before. Using the else block an both tasks within the with context suddenly was allowed in after
dsl . E.g:
#Good
task2(...).after(task1_op, dummy_op)
#Fails with Illegal task dependency across DSL context managers.
task2(...).after(task1_op)
Upvotes: 0
Reputation: 66
The real issue here is with dsl.Condition():
creates a sub group, where task1_op
is an inner task only "visible" from within the sub group. In the latest SDK, it will throw a more explicit error message saying task2
cannot depends on any inner task.
So to resolve the issue, you just need to move task2
to be within the condition--if condition was not met, you don't have a valid input to feed into task2 anyway.
with dsl.Condition(do_task1 == True):
task1_op = (
task1(
task1_name=task1_name,
)
)
task2_op = (
task2(
task1_output=task1_op.outputs["output_path"],
)
)
Upvotes: 2