Reputation: 7951
I have a BigQuery dataset with multiple tables, we call them base or source tables. Some external application is appending data to these base tables, some periodically, some sporadically. I want an Airflow DAG that queries these source_table
s and insert the data of the resulting query to their counterpart bulk tables (named source_tables + '_bulk'
) daily within the same BigQuery dataset, based on some formula that applies universally to all of them. The sql file has a fixed query with a placeholder for the source_table
.
My DAG looks like this:
projectId = os.environ["GCP_PROJECT"]
dataset = <target-dataset>
dag = DAG(...)
selectInsertOp = BigQueryOperator(
...
sql=<sed_the_source_table_placeholder('sql_file.sql')>,
...
destination_dataset_table=source_table + '_bulk'
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
dag=dag
)
selectInsertOp
As the source_table
s are numerous (hundreds), how can I implement this without repeating the DAG file (and the corresponding SQL file)? I want this single DAG file to create multiple BigQueryOperator tasks.
Upvotes: 0
Views: 1093
Reputation: 3034
You can wrap the operator in a for loop like this if you have the list of tables necessary.
for source_table in table_list:
selectInsertOp = BigQueryOperator(
...
sql=<sed_the_source_table_placeholder('sql_file.sql')>,
...
destination_dataset_table=source_table + '_bulk'
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
dag=dag
Upvotes: 1