oikonomiyaki
oikonomiyaki

Reputation: 7951

Airflow DAG to apply on multiple BigQuery tables in the dataset

I have a BigQuery dataset with multiple tables, we call them base or source tables. Some external application is appending data to these base tables, some periodically, some sporadically. I want an Airflow DAG that queries these source_tables and insert the data of the resulting query to their counterpart bulk tables (named source_tables + '_bulk') daily within the same BigQuery dataset, based on some formula that applies universally to all of them. The sql file has a fixed query with a placeholder for the source_table.

My DAG looks like this:

projectId = os.environ["GCP_PROJECT"]
dataset = <target-dataset>

dag = DAG(...)

selectInsertOp = BigQueryOperator(
    ...
    sql=<sed_the_source_table_placeholder('sql_file.sql')>,
    ...
    destination_dataset_table=source_table + '_bulk'
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_APPEND',
    dag=dag
)

selectInsertOp

As the source_tables are numerous (hundreds), how can I implement this without repeating the DAG file (and the corresponding SQL file)? I want this single DAG file to create multiple BigQueryOperator tasks.

Upvotes: 0

Views: 1093

Answers (1)

Daniel Zagales
Daniel Zagales

Reputation: 3034

You can wrap the operator in a for loop like this if you have the list of tables necessary.

for source_table in table_list:
    selectInsertOp = BigQueryOperator(
    ...
    sql=<sed_the_source_table_placeholder('sql_file.sql')>,
    ...
    destination_dataset_table=source_table + '_bulk'
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_APPEND',
    dag=dag

Upvotes: 1

Related Questions