Reputation: 1117
I have a component that runs BigQuery query. The pipeline accept parameters from generating the queries. The problem is when I send the queries in a list I doesn't get the right configuration. Here is simple example of the pipeline:
import json
import os
import pathlib
from kfp import compiler, dsl
from kfp.v2.dsl import Dataset, Output, component
from kfp.dsl import Input, component, Model
from pipelines import generate_query
import google.cloud.aiplatform as aip
from pathlib import Path
from jinja2 import Template
def generate_query(input_file: Path, **replacements) -> str:
with open(input_file, "r") as f:
query_template = f.read()
return Template(query_template).render(**replacements)
@component(
base_image="python:3.8",
packages_to_install=[
"google-cloud-bigquery==2.30.0",
"google-cloud-storage==2.13.0",
"pytz==2023.3"
],
)
def extract_bq_to_dataset(
bq_client_project_id: str,
dataset: Output[Dataset],
dataset_location: str = "EU",
sequiential_queries: list = None,
startup_query: str = None,
):
from datetime import datetime
from pathlib import Path
from google.cloud.exceptions import GoogleCloudError
from google.cloud import bigquery
def run_bigquery_query(query):
query_job = client.query(query, job_config=job_config)
return query_job
job_config = bigquery.QueryJobConfig()
client = bigquery.client.Client(
project=bq_client_project_id,
location=dataset_location
)
query_job = run_bigquery_query(startup_query)
if sequiential_queries:
# print len of sequiential_queries
for query in sequiential_queries:
query_job = run_bigquery_query(query)
@dsl.pipeline(name=f"{os.environ.get('MODEL_NAME','UNDEFINED')}-{os.environ.get('ENV','UNDEFINED')}")
def als_pipeline(
bigquery_project_id: str = os.environ.get("BIGQUERY_PROJECT_ID"),
training_files_destination: str = "gs://htz-data-pl-root/prod/als/train_data",
dataset_id: str = "Recommendations",
dataset_location: str = "EU",
table_name: str = "train_data",
project_id: str = os.environ.get("VERTEX_PROJECT_ID"),
user_min_articles: int = 5,
user_max_articles: int = 1000,
):
table_path = f"`{bigquery_project_id}.{dataset_id}.{table_name}`"
bq_queries = []
queries_folder = pathlib.Path(__file__).parent / "queries"
bq_queries.append(
generate_query(
queries_folder / "startup_query.sql",
table_path=table_path,
min_articles=user_min_articles,
max_articles=user_max_articles,
)
)
startup_query = generate_query(
queries_folder / "startup_query.sql",
table_path=table_path,
min_articles=user_min_articles,
max_articles=user_max_articles,
)
train_dataset = (
extract_bq_to_dataset(
bq_client_project_id=os.environ.get("BIGQUERY_PROJECT_ID"),
source_project_id=os.environ.get("BIGQUERY_PROJECT_ID"),
dataset_id=dataset_id,
table_name=table_name,
dataset_location=dataset_location,
extract_job_config={"destination_format": "PARQUET", "compression": "GZIP"},
destination_gcs_uri=training_files_destination,
sequiential_queries=bq_queries,
startup_query=startup_query,
)
.set_cpu_limit('500m')
.set_memory_limit('2G')
.set_display_name("Extract train data to storage")
).outputs["dataset"]
Once we execute it I can see the parameters in the console. The startup query gets the right parameter -
create or replace table
`{{$.inputs.parameters['pipelinechannel--bigquery_project_id']}}.{{$.inputs.parameters['pipelinechannel--dataset_id']}}.{{$.inputs.parameters['pipelinechannel--table_name']}}`
but the sequiential_queries
which is list of str get the wrong config
create or replace table `{{channel:task=;name=bigquery_project_id;type=String;}}.{{channel:task=;name=dataset_id;type=String;}}.{{channel:task=;name=table_name;type=String;}}`
The sequiential_queries
failed to run.
I don't want to split every query to different component and I don't want to send it as single parameter.
Any suggestion?
Upvotes: 0
Views: 212