VanDough
VanDough

Reputation: 41

Vertex AI Pipelines (Kubeflow) skip step with dependent outputs on later step

I’m trying to run a Vertex AI Pipelines job where I skip a certain pipeline step if the value of a certain pipeline parameter (in this case do_task1) is False. But because there is another step that runs unconditionally and expects the output of the first potentially skipped step, I get the following error, independently of do_task1 being True or False:

AssertionError: component_input_artifact: pipelineparam--task1-output_path not found. All inputs: parameters {
  key: "do_task1"
  value {
    type: STRING
  }
}
parameters {
  key: "task1_name"
  value {
    type: STRING
  }
}

It seems like the compiler just cannot find the output output_path from task1. So I wonder if there is any way to have some sort of placeholders for the outputs of those steps that are under a dsl.Condition , and thus they get filled with default values unless the actual steps run and fill them with the non-default values. The code below represents the problem and is easily reproducible.

I'm using google-cloud-aiplatform==1.14.0 and kfp==1.8.11

from typing import NamedTuple

from kfp import dsl
from kfp.v2.dsl import Dataset, Input, OutputPath, component
from kfp.v2 import compiler

from google.cloud.aiplatform import pipeline_jobs

@component(
    base_image="python:3.9",
    packages_to_install=["pandas"]
)
def task1(
    # inputs
    task1_name: str,
    # outputs
    output_path: OutputPath("Dataset"),
) -> NamedTuple("Outputs", [("output_1", str), ("output_2", int)]):

    import pandas as pd
    
    output_1 = task1_name + "-processed"
    output_2 = 2

    df_output_1 = pd.DataFrame({"output_1": [output_1]})
    df_output_1.to_csv(output_path, index=False)

    return (output_1, output_2)

@component(
    base_image="python:3.9",
    packages_to_install=["pandas"]
)
def task2(
    # inputs
    task1_output: Input[Dataset],
) -> str:

    import pandas as pd

    task1_input = pd.read_csv(task1_output.path).values[0][0]

    return task1_input

@dsl.pipeline(
    pipeline_root='pipeline_root',
    name='pipelinename',
)
def pipeline(
    do_task1: bool,
    task1_name: str,
):

    with dsl.Condition(do_task1 == True):

        task1_op = (
            task1(
                task1_name=task1_name,
            )
        )

    task2_op = (
        task2(
            task1_output=task1_op.outputs["output_path"],
        )
    )


if __name__ == '__main__':
    
    do_task1 = True # <------------ The variable to modify ---------------

    # compile pipeline
    compiler.Compiler().compile(
        pipeline_func=pipeline, package_path='pipeline.json')

    # create pipeline run
    pipeline_run = pipeline_jobs.PipelineJob(
        display_name='pipeline-display-name',
        pipeline_root='pipelineroot',
        job_id='pipeline-job-id',
        template_path='pipelinename.json',
        parameter_values={
            'do_task1': do_task1, # pipeline compilation fails with either True or False values
            'task1_name': 'Task 1',
        },
        enable_caching=False
    )
    
    # execute pipeline run
    pipeline_run.run()

Any help is much appreciated!

Upvotes: 1

Views: 1264

Answers (2)

dre-hh
dre-hh

Reputation: 8044

I found a workaround by introducing a dummy_task and an else block.

In your case you would also need to use dsl.OneOf to combine the outputs.

@dsl.component(base_image="python:3.11")
def dummy(a: str) -> str:
    return "ok"

with dsl.Condition(do_task1 == True):
    task1_op = (
        task1(
            task1_name=task1_name,
        )
    )

with dsl.Else:
    dummy_op = dummy(a="dummy")

cond_ouput = dsl.OneOf(task1_op.outputs["output_path"], dummy_op.output)
task2_op = task_2(task1_output=cond_ouput)

In my case, I did not need the input of the conditional task. Task2 just had to run after task1 if task1 is run, otherwise task2 should run after the non coditional task before. Using the else block an both tasks within the with context suddenly was allowed in after dsl . E.g:

#Good 
task2(...).after(task1_op, dummy_op)

#Fails with Illegal task dependency across DSL context managers.
task2(...).after(task1_op)

Upvotes: 0

chesu
chesu

Reputation: 66

The real issue here is with dsl.Condition(): creates a sub group, where task1_op is an inner task only "visible" from within the sub group. In the latest SDK, it will throw a more explicit error message saying task2 cannot depends on any inner task. So to resolve the issue, you just need to move task2 to be within the condition--if condition was not met, you don't have a valid input to feed into task2 anyway.

    with dsl.Condition(do_task1 == True):

        task1_op = (
            task1(
                task1_name=task1_name,
            )
        )

        task2_op = (
            task2(
                task1_output=task1_op.outputs["output_path"],
            )
        )

Upvotes: 2

Related Questions