MPA
MPA

Reputation: 1117

Vertex Ai pipelines - send parameter to pipeline in list of str

I have a component that runs BigQuery query. The pipeline accept parameters from generating the queries. The problem is when I send the queries in a list I doesn't get the right configuration. Here is simple example of the pipeline:

import json
import os
import pathlib

from kfp import compiler, dsl
from kfp.v2.dsl import Dataset, Output, component
from kfp.dsl import Input, component, Model
from pipelines import generate_query
import google.cloud.aiplatform as aip
from pathlib import Path
from jinja2 import Template


def generate_query(input_file: Path, **replacements) -> str:

    with open(input_file, "r") as f:
        query_template = f.read()

    return Template(query_template).render(**replacements)


@component(
    base_image="python:3.8",
    packages_to_install=[
        "google-cloud-bigquery==2.30.0",
        "google-cloud-storage==2.13.0",
        "pytz==2023.3"
        ],
)
def extract_bq_to_dataset(
    bq_client_project_id: str,
    dataset: Output[Dataset],
    dataset_location: str = "EU",
    sequiential_queries: list = None,
    startup_query: str = None,
):
    from datetime import datetime
    from pathlib import Path
    from google.cloud.exceptions import GoogleCloudError
    from google.cloud import bigquery

    def run_bigquery_query(query):
        query_job = client.query(query, job_config=job_config)
        return query_job

    job_config = bigquery.QueryJobConfig() 
    client = bigquery.client.Client(
        project=bq_client_project_id, 
        location=dataset_location
    )
    query_job = run_bigquery_query(startup_query)
    if sequiential_queries:
        # print len of sequiential_queries
        for query in sequiential_queries:
            query_job = run_bigquery_query(query)


@dsl.pipeline(name=f"{os.environ.get('MODEL_NAME','UNDEFINED')}-{os.environ.get('ENV','UNDEFINED')}")
def als_pipeline(
    bigquery_project_id: str = os.environ.get("BIGQUERY_PROJECT_ID"),
    training_files_destination: str = "gs://htz-data-pl-root/prod/als/train_data",
    dataset_id: str = "Recommendations",
    dataset_location: str = "EU",
    table_name: str = "train_data",
    project_id: str = os.environ.get("VERTEX_PROJECT_ID"),
    user_min_articles: int = 5,
    user_max_articles: int = 1000,
):
    table_path  = f"`{bigquery_project_id}.{dataset_id}.{table_name}`"
    bq_queries = []
    queries_folder = pathlib.Path(__file__).parent / "queries"

    bq_queries.append(
         generate_query(
          queries_folder / "startup_query.sql",
          table_path=table_path,
          min_articles=user_min_articles,
          max_articles=user_max_articles,
        )
    )
    
    startup_query = generate_query(
          queries_folder / "startup_query.sql",
          table_path=table_path,
          min_articles=user_min_articles,
          max_articles=user_max_articles,
    )


    train_dataset = (
        extract_bq_to_dataset(
            bq_client_project_id=os.environ.get("BIGQUERY_PROJECT_ID"),
            source_project_id=os.environ.get("BIGQUERY_PROJECT_ID"),
            dataset_id=dataset_id,
            table_name=table_name,
            dataset_location=dataset_location,
            extract_job_config={"destination_format": "PARQUET", "compression": "GZIP"},
            destination_gcs_uri=training_files_destination,
            sequiential_queries=bq_queries,
            startup_query=startup_query,
        )
        .set_cpu_limit('500m')
        .set_memory_limit('2G')
        .set_display_name("Extract train data to storage")
    ).outputs["dataset"]

Once we execute it I can see the parameters in the console. The startup query gets the right parameter -

create or replace table 
`{{$.inputs.parameters['pipelinechannel--bigquery_project_id']}}.{{$.inputs.parameters['pipelinechannel--dataset_id']}}.{{$.inputs.parameters['pipelinechannel--table_name']}}`

but the sequiential_queries which is list of str get the wrong config

create or replace table `{{channel:task=;name=bigquery_project_id;type=String;}}.{{channel:task=;name=dataset_id;type=String;}}.{{channel:task=;name=table_name;type=String;}}`

The sequiential_queries failed to run. I don't want to split every query to different component and I don't want to send it as single parameter. Any suggestion?

Upvotes: 0

Views: 212

Answers (0)

Related Questions