Marco Abbatangelo
Marco Abbatangelo

Reputation: 87

VertexAI Pipeline: How to use an output from a custom kfp component as input for google_cloud_pipeline_components?

I'm trying to write the Python code for a pipeline in VertexAI using kfp components. I have a step where i create a system.Dataset object that is the following:

@component(base_image="python:3.9", packages_to_install=["google-cloud-bigquery","pandas","pyarrow","fsspec","gcsfs"])
def create_dataframe(
    project: str,
    region: str,
    destination_dataset: str,
    destination_table_name: str,
    dataset: Output[Dataset],
):
    
    from google.cloud import bigquery
    
    client = bigquery.Client(project=project, location=region)
    dataset_ref = bigquery.DatasetReference(project, destination_dataset)
    table_ref = dataset_ref.table(destination_table_name)
    table = client.get_table(table_ref)

    train = client.list_rows(table).to_dataframe()
    train.drop("<list_of_columns>", axis=1, inplace=True)
    train['class'] = [0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 1]
    
    train.to_csv(dataset.uri)

Then I use the dataset as input for AutoMLTabularTrainingJobRunOp:

df = create_dataframe(project=project,
                      region=region,
                      destination_dataset=destination_dataset,
                      destination_table_name=destination_table_name,
)
    
# Training with AutoML
training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
            project=project,
            display_name="train-automl-task",
            optimization_prediction_type="classification",
            column_transformations=[
                "<nested_dict>",
            ],
            dataset=df.outputs["dataset"],
            target_column="class",
            budget_milli_node_hours=1000,
)

Looking at the logs, I found this error:

"Traceback (most recent call last): "

" File "/opt/python3.7/lib/python3.7/runpy.py", line 193, in _run_module_as_main "

" "__main__", mod_spec) "

" File "/opt/python3.7/lib/python3.7/runpy.py", line 85, in _run_code "

" exec(code, run_globals) "

" File "/opt/python3.7/lib/python3.7/site-packages/google_cloud_pipeline_components/remote/aiplatform/remote_runner.py", line 284, in <module> "

" main() "

" File "/opt/python3.7/lib/python3.7/site-packages/google_cloud_pipeline_components/remote/aiplatform/remote_runner.py", line 280, in main "

" print(runner(args.cls_name, args.method_name, executor_input, kwargs)) "

" File "/opt/python3.7/lib/python3.7/site-packages/google_cloud_pipeline_components/remote/aiplatform/remote_runner.py", line 236, in runner "

" prepare_parameters(serialized_args[METHOD_KEY], method, is_init=False) "

" File "/opt/python3.7/lib/python3.7/site-packages/google_cloud_pipeline_components/remote/aiplatform/remote_runner.py", line 205, in prepare_parameters "

" value = cast(value, param_type) "

" File "/opt/python3.7/lib/python3.7/site-packages/google_cloud_pipeline_components/remote/aiplatform/remote_runner.py", line 176, in cast "

" return annotation_type(value) "

" File "/opt/python3.7/lib/python3.7/site-packages/google/cloud/aiplatform/datasets/dataset.py", line 81, in __init__ "

" self._gca_resource = self._get_gca_resource(resource_name=dataset_name) "

" File "/opt/python3.7/lib/python3.7/site-packages/google/cloud/aiplatform/base.py", line 532, in _get_gca_resource "

" location=self.location, "

" File "/opt/python3.7/lib/python3.7/site-packages/google/cloud/aiplatform/utils/__init__.py", line 192, in full_resource_name "

" raise ValueError(f"Please provide a valid {resource_noun[:-1]} name or ID") "

"ValueError: Please provide a valid dataset name or ID "

So, I looked at source code in google/cloud/aiplatform/utils/__init__.py at line 192 and I found that the resource name should be like: "projects/.../locations/.../datasets/12345" or "projects/.../locations/.../metadataStores/.../contexts/12345" .

Opening the executor_output.json file that is created in my bucket after running create_dataframe I discovered that the file name seems to be in the right format:

{"artifacts": {"dataset": {"artifacts": [{"name": "projects/my_project/locations/my_region/metadataStores/default/artifacts/1299...", "uri": "my_bucket/object_folder", "metadata": {"name": "reshaped-training-dataset"}}]}}}

I tried also to set a human readable name for dataset in metadata, but I did not work. Any suggestion would be really helpful.

Upvotes: 2

Views: 1902

Answers (2)

sina chavoshi
sina chavoshi

Reputation: 51

The AutoMLTabularTrainingJobRunOp expects a Managed Dataset which is not the same thing as local dataset artifact type. You can turn your local data from the component into a Managed Dataset and then pass it on to the AutoMLTabularTrainingJobRunOp. see AutoML Tabular pipelines using google-cloud-pipeline-components for an example on how to do this.

Regarding why you where getting the error, the json that you referenced above is the Artifact that points to your dataset. The artifact itself is a managed resource and hence it has a Vertex resourceName which is projects/my_project/locations/my_region/metadataStores/default/artifacts/1299... note the name artifact in the resourceName. What the service expect here is for the URI to be a Managed Dataset resourceName which is currently set to my_bucket/object_folder. Once you create / use a Managed Dataset the URI will look something like projects/my_project/locations/my_region/metadataStores/default/datasets/74225... note the name dataset in the resourceName.

One more note, the lightweight Python components used to only support artifacts of System type ie system.Dataset, however google_cloud_pipeline_components uses artifacts of Google type ie. google.Dataset . The PR# 8279 enables use of Google types with lightweight python components.

Upvotes: 1

Raul Saucedo
Raul Saucedo

Reputation: 1780

You can add the parameter “ dataset: Input[Dataset]” as in this example:

df = create_dataframe(project=project,
                      region=region,
                      destination_dataset=destination_dataset,
                      destination_table_name=destination_table_name,
                      dataset: Input[Dataset],
)

You can also see more documentation pipelines and pipelines with kfp.

Upvotes: 0

Related Questions