Reputation: 879
This is my first time using Google's Vertex AI Pipelines. I checked this codelab as well as this post and this post, on top of some links derived from the official documentation. I decided to put all that knowledge to work, in some toy example: I was planning to build a pipeline consisting of 2 components: "get-data" (which reads some .csv file stored in Cloud Storage) and "report-data" (which basically returns the shape of the .csv data read in the previous component). Furthermore, I was cautious to include some suggestions provided in this forum. The code I currently have, goes as follows:
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
from google.cloud import aiplatform
# Components section
@component(
packages_to_install=[
"google-cloud-storage",
"pandas",
],
base_image="python:3.9",
output_component_file="get_data.yaml"
)
def get_data(
bucket: str,
url: str,
dataset: Output[Dataset],
):
import pandas as pd
from google.cloud import storage
storage_client = storage.Client("my-project")
bucket = storage_client.get_bucket(bucket)
blob = bucket.blob(url)
blob.download_to_filename('localdf.csv')
# path = "gs://my-bucket/program_grouping_data.zip"
df = pd.read_csv('localdf.csv', compression='zip')
df['new_skills'] = df['new_skills'].apply(ast.literal_eval)
df.to_csv(dataset.path + ".csv" , index=False, encoding='utf-8-sig')
@component(
packages_to_install=["pandas"],
base_image="python:3.9",
output_component_file="report_data.yaml"
)
def report_data(
inputd: Input[Dataset],
):
import pandas as pd
df = pd.read_csv(inputd.path)
return df.shape
# Pipeline section
@pipeline(
# Default pipeline root. You can override it when submitting the pipeline.
pipeline_root=PIPELINE_ROOT,
# A name for the pipeline.
name="my-pipeline",
)
def my_pipeline(
url: str = "test_vertex/pipeline_root/program_grouping_data.zip",
bucket: str = "my-bucket"
):
dataset_task = get_data(bucket, url)
dimensions = report_data(
dataset_task.output
)
# Compilation section
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path="pipeline_job.json"
)
# Running and submitting job
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
run1 = aiplatform.PipelineJob(
display_name="my-pipeline",
template_path="pipeline_job.json",
job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
parameter_values={"url": "test_vertex/pipeline_root/program_grouping_data.zip", "bucket": "my-bucket"},
enable_caching=True,
)
run1.submit()
I was happy to see that the pipeline compiled with no errors, and managed to submit the job. However "my happiness lasted short", as when I went to Vertex AI Pipelines, I stumbled upon some "error", which goes like:
The DAG failed because some tasks failed. The failed tasks are: [get-data].; Job (project_id = my-project, job_id = 4290278978419163136) is failed due to the above error.; Failed to handle the job: {project_number = xxxxxxxx, job_id = 4290278978419163136}
I did not find any related info on the web, neither could I find any log or something similar, and I feel a bit overwhelmed that the solution to this (seemingly) easy example, is still eluding me.
Quite obviously, I don't what or where I am mistaking. Any suggestion?
Upvotes: 4
Views: 5190
Reputation: 11
Thanks for your writeup. Very helpful! I had the same error, but it turned out to be for a different reasons, so noting it here...
In my pipeline definition step I have the following parameters...
'''
def my_pipeline(bq_source_project: str = BQ_SOURCE_PROJECT,
bq_source_dataset: str = BQ_SOURCE_DATASET,
bq_source_table: str = BQ_SOURCE_TABLE,
output_data_path: str = "crime_data.csv"):
'''
My error was when I run the pipeline, I did not have these same parameters entered. Below is the fixed version... '''
job = pipeline_jobs.PipelineJob(
project=PROJECT_ID,
location=LOCATION,
display_name=PIPELINE_NAME,
job_id=JOB_ID,
template_path=FILENAME,
pipeline_root=PIPELINE_ROOT,
parameter_values={'bq_source_project': BQ_SOURCE_PROJECT,
'bq_source_dataset': BQ_SOURCE_DATASET,
'bq_source_table': BQ_SOURCE_TABLE}
'''
Upvotes: 1
Reputation: 879
With some suggestions provided in the comments, I think I managed to make my demo pipeline work. I will first include the updated code:
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
from datetime import datetime
from google.cloud import aiplatform
from typing import NamedTuple
# Importing 'COMPONENTS' of the 'PIPELINE'
@component(
packages_to_install=[
"google-cloud-storage",
"pandas",
],
base_image="python:3.9",
output_component_file="get_data.yaml"
)
def get_data(
bucket: str,
url: str,
dataset: Output[Dataset],
):
"""Reads a csv file, from some location in Cloud Storage"""
import ast
import pandas as pd
from google.cloud import storage
# 'Pulling' demo .csv data from a know location in GCS
storage_client = storage.Client("my-project")
bucket = storage_client.get_bucket(bucket)
blob = bucket.blob(url)
blob.download_to_filename('localdf.csv')
# Reading the pulled demo .csv data
df = pd.read_csv('localdf.csv', compression='zip')
df['new_skills'] = df['new_skills'].apply(ast.literal_eval)
df.to_csv(dataset.path + ".csv" , index=False, encoding='utf-8-sig')
@component(
packages_to_install=["pandas"],
base_image="python:3.9",
output_component_file="report_data.yaml"
)
def report_data(
inputd: Input[Dataset],
) -> NamedTuple("output", [("rows", int), ("columns", int)]):
"""From a passed csv file existing in Cloud Storage, returns its dimensions"""
import pandas as pd
df = pd.read_csv(inputd.path+".csv")
return df.shape
# Building the 'PIPELINE'
@pipeline(
# i.e. in my case: PIPELINE_ROOT = 'gs://my-bucket/test_vertex/pipeline_root/'
# Can be overriden when submitting the pipeline
pipeline_root=PIPELINE_ROOT,
name="readcsv-pipeline", # Your own naming for the pipeline.
)
def my_pipeline(
url: str = "test_vertex/pipeline_root/program_grouping_data.zip",
bucket: str = "my-bucket"
):
dataset_task = get_data(bucket, url)
dimensions = report_data(
dataset_task.output
)
# Compiling the 'PIPELINE'
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path="pipeline_job.json"
)
# Running the 'PIPELINE'
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
run1 = aiplatform.PipelineJob(
display_name="my-pipeline",
template_path="pipeline_job.json",
job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
parameter_values={
"url": "test_vertex/pipeline_root/program_grouping_data.zip",
"bucket": "my-bucket"
},
enable_caching=True,
)
# Submitting the 'PIPELINE'
run1.submit()
Now, I will add some complementary comments, which in sum, managed to solve my problem:
Input[Dataset]
or Ouput[Dataset]
, adding the file extension is needed (and quite easy to forget). Take for instance the ouput of the get_data
component, and notice how the data is recorded by specifically adding the file extension, i.e. dataset.path + ".csv"
.Of course, this is a very tiny example, and projects can easily scale to huge projects, however as some sort of "Hello Vertex AI Pipelines" it will work well.
Thank you.
Upvotes: 4