Reputation: 13705
I'm using the BigQueryOperator
extenstively in my Airflow DAGs on Google Cloud Composer.
For longer queries, it's better to put each query in its own .sql
file rather than cluttering up the DAG with it. Airflow seems to support this for all SQL Query operators, including the BigQueryOperator, as you can see in the documentation.
My question: after I've written a my sql statement in a .sql
template file, how do I add it to Google Cloud Composer and reference it in a DAG?
Upvotes: 9
Views: 19217
Reputation: 236
I found an ideal fix for this question. In your dag declaration you can set template_searchpath
which is the default path where Airflow will lookup jinja templated files.
In order to make this work in your Cloud Composer instance, you must set it at follows
dag = DAG(
...
template_searchpath=["/home/airflow/gcs/plugins"],
)
Note that I used the plugins folder for this example. You can use your data folder instead or any folder you want to have inside your bucket.
Upvotes: 12
Reputation: 192
We recently solved this using a similar strategy. The steps are:
BigQueryOperator
.Here's a minimal solution:
from airflow.operators import bash_operator
from airflow.contrib.operators import bigquery_operator
with models.DAG(
'bigquery_dag',
schedule_interval = None ,
template_searchpath = ['/home/airflow/gcs/data/repo/queries/'],
default_args = default_dag_args
) as dag:
t1_clean_repo = bash_operator.BashOperator(
task_id = 'clean_repo',
bash_command = 'rm -rf /home/airflow/gcs/data/repo'
)
clone_command = """
gcloud source repos clone repo --project=project_id
cp -R repo /home/airflow/gcs/data
"""
t2_clone_repo = bash_operator.BashOperator(
task_id='clone_repo',
bash_command=clone_command
)
t3_query = bigquery_operator.BigQueryOperator(
task_id='query',
sql= 'query.sql',
use_legacy_sql = False,
bigquery_conn_id='conn_id'
)
We're taking advantage of a few important concepts here:
git clone
the files.template_searchpath
in the DAG arguments, expanding the search scope to include the data
directory in the Cloud Storage Bucket.Upvotes: 0
Reputation: 13705
After googling around and finding this related question. I've found a way to make this work (although it's not the ideal solution, as we'll see). Here is a working example with three pieces:
gcloud
command needed to upload the template to the right place.(1) The sql template file
This is just a text file whose filename ends with the .sql
extension. Let's say this file is called my-templated-query.sql
and contains:
SELECT COUNT(1)
FROM mytable
WHERE _PARTITIONTIME = TIMESTAMP('{{ ds }}')
(2) Referencing the template in the DAG file To reference this template, create an operator like the following:
count_task = BigQueryOperator(
task_id='count_rows',
sql='/my-templated-query.sql')
(3) Adding the template file to Google Cloud Composer It turns out that by default, airflow looks for template files in the dags folder. To upload our templated file to the dags folder, we run
gcloud beta composer environments storage dags import --environment my-env-name --location us-central1 --source path/to/my-templated-query.sql
You'll have to replace the env name, location, and source path accordingly.
It doesn't really seem right to upload all these templates to the dag folder. A better Airflow practice is to put your templates in their own folder, and specify the template_searchpath
parameter to point to it when you create your DAG. However, I'm not sure how to do this with Google Cloud Composer.
Update: I've realized it's possible to put subfolders in the DAG folder, which is useful for organizing large numbers of SQL templates. Let's say I put a SQL template file in DAG_FOLDER/dataset1/table1.sql
In the BigQueryOperator, Ithen can then refer to this using sql=/dataset1/table1.sql
. If you've a subfolder with lots of files in it and lots of other subfolders in it, you can also use the dag import
I show above to upload the entire sub folder recursively--just point it to subfolder.
Upvotes: 5