Reputation: 71
I'm encountering an issue with an Airflow DAG task utilizing the BigQueryOperator. The task involves executing a SQL file (cast.sql) stored at "/home/airflow/gcs/dags/movies/sql/RRR/", and I'm passing parameters to the SQL file using the params argument in the BigQueryOperator.
sql_path = "/home/airflow/gcs/dags/movies/sql/RRR/cast.sql"
stage_latest_parameters = {
"project_id": project_id,
"dataset_name": dataset_name,
"date": raw_latest_date,
"source_table": source,
"dest_table": dest
}
sql_with_parameters_task = BigQueryOperator(
task_id='sql_with_parameters_task',
sql=sql_path,
params=stage_latest_parameters,
use_legacy_sql=False,
dag=dag
)
ERROR:
Exception rendering Jinja template for task 'sql_with_parameters_task', field 'sql'.
Template: '/home/airflow/gcs/dags/movies/sql/RRR/cast.sql'
...
jinja2.exceptions.TemplateNotFound: /home/airflow/gcs/dags/movies/sql/RRR/cast.sql
QUERY:
CREATE OR REPLACE TABLE `{project_id}.{dataset_name}.{dest_table}`
AS
SELECT *, {date} AS date
FROM `{project_id}.{dataset_name}.{source_table}`
Here, I don't need to use any templates. I just need to run the query by passing necessary parameters that are sitting inside a GCS bucket. What was I doing wrong? Appreciate your Help.
Upvotes: 0
Views: 381
Reputation: 171
You must use the template_searchpath
parameter of DAG class. Give this parameter the path to the parent folder of the SQL script. Like:
with DAG(
dag_id = 'example_dag',
template_searchpath = '/home/airflow/gcs/dags/movies/sql/RRR/'
) as dag:
You can also give a list instead of a single path. After, you can use the name of the SQL script in BigQueryOperator
or BigQueryInsertJobOperator
.
Upvotes: 0