Reputation: 405
I'm trying to move data from 50 tables in Postgres to BigQuery via Airflow. Each table follows the same 4 operations, just on different data:
get_latest_timestamp >> copy_data_to_bigquery >> verify_bigquery_data >> delete_postgres_data
Some things I've considered:
table 1
to process before table 2
, for example. I know I can use cross-DAG dependencies to achieve a similar effect, but I'd like to have a "main DAG" which manages these relationships.get_latest_timestamp_table1 >> copy_data_to_bigquery_table1 >> verify_bigquery_data_table1 >> delete_postgres_data_table1
get_latest_timestamp_table2 >> copy_data_to_bigquery_table2 >> verify_bigquery_data_table2 >> delete_postgres_data_table2
...
for table in table_names:
get_latest_timestamp = {PythonOperator with tablename as an input}
...
get_latest_timestamp >> copy_data_to_bigquery >> verify_bigquery_data >> delete_postgres_data
Any other ideas? I'm pretty new to Airflow, so not sure what the best practices are for repeating similar operations.
I tried copy/pasting each task (50*4=200 tasks) in a single DAG. It works, but is ugly.
Upvotes: 1
Views: 1907
Reputation: 2209
I used the same TaskGroups approach and enhanced it a bit so you'll have:
So it looks like this:
The code should look like this:
with DAG(dag_id="daily-sync-pipeline-dag",
start_date=datetime(2025,1,1),
schedule_interval="@hourly",
catchup=False) as dag:
start = BashOperator(
task_id="start",
...
)
end = BashOperator(
task_id="end",
...
)
processAllTables = TaskGroup(group_id='processAllTables')
tables = ['TABLE_1', 'TABLE_2', 'TABLE_3']
for table in tables:
with TaskGroup(group_id=f'{table}_sync', parent_group=processAllTables):
exportToS3 = DummyOperator(task_id=f'{table}_exportToS3')
importToSnowflake = DummyOperator(task_id=f'{table}_importToSnowflake')
finalTouch = DummyOperator(task_id=f'{table}_finalTouch')
exportToS3 >> importToSnowflake >> finalTouch
# Adding another conditional step:
if table == "MY_SPECIAL_TABLE":
extraStep = DummyOperator(task_id=f'{table}_extraStep')
finalTouch >> extraStep
start >> processAllTables >> end
Upvotes: 0
Reputation: 401
to avoid code replication you could use TaskGroups. This is very well described here
for table in table_names:
with TaskGroup(group_id='process_tables') as process_tables:
get_latest_timestamp = EmptyOperator(task_id=f'{table}_timestamp')
copy_data_to_bigquery = EmptyOperator(task_id=f'{table}_to_bq')
.....
get_latest_timestamp >> copy_data_to_bigquery
You can fetch xcoms by providing also the task group like so: '''
process_tables.copy_data_to_bigquery
Combining task group with other task would look like this
start >> process_tables >> end
Upvotes: 2