James
James

Reputation: 4052

Install Custom Dependency for KFP Op

I'm trying to setup a simple KubeFlow pipeline, and I'm having trouble packaging up dependencies in a way that works for KubeFlow.

The code simply downloads a config file and parses it, then passes back the parsed configuration.

However, in order to parse the config file, it needs to have access to another internal python package.

I have a .tar.gz archive of the package hosted on a bucket in the same project, and added the URL of the package as a dependency, but I get an error message saying tarfile.ReadError: not a gzip file.

I know the file is good, so it's some intermediate issue with hosting on a bucket or the way kubeflow installs dependencies.

Here is a minimal example:

from kfp import compiler
from kfp import dsl
from kfp.components import func_to_container_op
from google.protobuf import text_format
from google.cloud import storage
import training_reader

def get_training_config(working_bucket: str,
                        working_directoy: str,
                        config_file: str) -> training_reader.TrainEvalPipelineConfig:
    download_file(working_bucket, os.path.join(working_directoy, config_file), "ssd.config")
    pipeline_config = training_reader.TrainEvalPipelineConfig()
    with open("ssd.config", 'r') as f:
        text_format.Merge(f.read(), pipeline_config)
    return pipeline_config

config_op_packages = ["https://storage.cloud.google.com/my_bucket/packages/training-reader-0.1.tar.gz",
                      "google-cloud-storage",
                      "protobuf"
                      ]
training_config_op = func_to_container_op(get_training_config,
                                          base_image="tensorflow/tensorflow:1.15.2-py3",
                                          packages_to_install=config_op_packages)

def output_config(config: training_reader.TrainEvalPipelineConfig) -> None:
    print(config)

output_config_op = func_to_container_op(output_config)

@dsl.pipeline(
    name='Post Training Processing',
    description='Building the post-processing pipeline'
)
def ssd_postprocessing_pipeline(
    working_bucket: str,
    working_directory: str,
    config_file:str):
    config = training_config_op(working_bucket, working_directory, config_file)
    output_config_op(config.output)

pipeline_name = ssd_postprocessing_pipeline.__name__ + '.zip'
compiler.Compiler().compile(ssd_postprocessing_pipeline, pipeline_name)

Upvotes: 0

Views: 1816

Answers (1)

Ark-kun
Ark-kun

Reputation: 6812

The https://storage.cloud.google.com/my_bucket/packages/training-reader-0.1.tar.gz IRL requires authentication. Try to download it in Incognito mode and you'll see the login page instead of file. Changing the URL to https://storage.googleapis.com/my_bucket/packages/training-reader-0.1.tar.gz works for public objects, but your object is not public.

The only thing you can do (if you cannot make the package public) is to use google.cloud.storage library or gsutil program to download the file from the bucket and then manually install it suing subprocess.run([sys.executable, '-m', 'pip', 'install', ...])

Where are you downloading the data from?

What's the purpose of

    pipeline_config = training_reader.TrainEvalPipelineConfig()
    with open("ssd.config", 'r') as f:
        text_format.Merge(f.read(), pipeline_config)
    return pipeline_config

Why not just do the following:


def get_training_config(
    working_bucket: str,
    working_directory: str,
    config_file: str,
    output_config_path: OutputFile('TrainEvalPipelineConfig'),
):
    download_file(working_bucket, os.path.join(working_directoy, config_file), output_config_path)

the way kubeflow installs dependencies.

Export your component to loadable component.yaml and you'll see how KFP Lighweight components install dependencies:

training_config_op = func_to_container_op(
    get_training_config,
    base_image="tensorflow/tensorflow:1.15.2-py3",
    packages_to_install=config_op_packages,
    output_component_file='component.yaml',
)

P.S. Some small pieces of info:

@dsl.pipeline(

Not required unless you want to use the dsl-compile command-line program

pipeline_name = ssd_postprocessing_pipeline.name + '.zip' compiler.Compiler().compile(ssd_postprocessing_pipeline, pipeline_name)

Did you know that you can just kfp.Client(host=...).create_run_from_pipeline_func(ssd_postprocessing_pipeline, arguments={}) to run the pipeline right away?

Upvotes: 1

Related Questions