Mike Altonji
Mike Altonji

Reputation: 405

How to Write a DAG with Multiple Similar Tasks

I'm trying to move data from 50 tables in Postgres to BigQuery via Airflow. Each table follows the same 4 operations, just on different data:

get_latest_timestamp >> copy_data_to_bigquery >> verify_bigquery_data >> delete_postgres_data

What's the cleanest way to repeat these operations for 50 tables?

Some things I've considered:

get_latest_timestamp_table1 >> copy_data_to_bigquery_table1 >> verify_bigquery_data_table1 >> delete_postgres_data_table1
get_latest_timestamp_table2 >> copy_data_to_bigquery_table2 >> verify_bigquery_data_table2 >> delete_postgres_data_table2
...
for table in table_names:
    get_latest_timestamp = {PythonOperator with tablename as an input}
    ...
    get_latest_timestamp >> copy_data_to_bigquery >> verify_bigquery_data >> delete_postgres_data

Any other ideas? I'm pretty new to Airflow, so not sure what the best practices are for repeating similar operations.

I tried copy/pasting each task (50*4=200 tasks) in a single DAG. It works, but is ugly.

Upvotes: 1

Views: 1907

Answers (2)

Naor Bar
Naor Bar

Reputation: 2209

I used the same TaskGroups approach and enhanced it a bit so you'll have:

  • one group per each table
  • all of those groups are nested inside one parentGroup

So it looks like this:

enter image description here

The code should look like this:

with DAG(dag_id="daily-sync-pipeline-dag",
        start_date=datetime(2025,1,1),
        schedule_interval="@hourly",
        catchup=False) as dag:
        
        start = BashOperator(
                task_id="start",
                ...
            )

        end = BashOperator(
                task_id="end",
                ...
            )


        processAllTables = TaskGroup(group_id='processAllTables')

        tables = ['TABLE_1', 'TABLE_2', 'TABLE_3']
        for table in tables:
            with TaskGroup(group_id=f'{table}_sync', parent_group=processAllTables):
                exportToS3 = DummyOperator(task_id=f'{table}_exportToS3')
                importToSnowflake = DummyOperator(task_id=f'{table}_importToSnowflake')
                finalTouch = DummyOperator(task_id=f'{table}_finalTouch')
                        
                exportToS3 >> importToSnowflake >> finalTouch

                # Adding another conditional step:
                if table == "MY_SPECIAL_TABLE":
                    extraStep = DummyOperator(task_id=f'{table}_extraStep')
                    finalTouch >> extraStep
            
start >> processAllTables >> end

Upvotes: 0

tomasz
tomasz

Reputation: 401

to avoid code replication you could use TaskGroups. This is very well described here

for table in table_names:
with TaskGroup(group_id='process_tables') as process_tables:
    get_latest_timestamp = EmptyOperator(task_id=f'{table}_timestamp')
    copy_data_to_bigquery = EmptyOperator(task_id=f'{table}_to_bq')
    .....
    get_latest_timestamp >> copy_data_to_bigquery

You can fetch xcoms by providing also the task group like so: '''

process_tables.copy_data_to_bigquery

Combining task group with other task would look like this

start >> process_tables >> end

Upvotes: 2

Related Questions