Reputation: 1014
Below is the dynamic dag creation for each table. I need to pass Table name for the load_table
, So the task can be seen as load_table_A
in DAG edw_table_A
and load_table_B
in the DAG edw_table_B
import datetime
import os
from functools import partial
from datetime import timedelta
from airflow.models import DAG,Variable
from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
from alerts.email_operator import dag_failure_email
def get_db_dag(
*,
dag_id,
start_date,
schedule_interval,
max_taskrun,
max_dagrun,
proc_nm,
load_sql
):
default_args = {
'owner': 'airflow',
'start_date': start_date,
'provide_context': True,
'execution_timeout': timedelta(minutes=max_taskrun),
'email_on_retry': False,
}
dag = DAG(
dag_id=dag_id,
schedule_interval=schedule_interval,
dagrun_timeout=timedelta(hours=max_dagrun),
template_searchpath=tmpl_search_path,
default_args=default_args,
max_active_runs=1,
catchup='{{var.value.dag_catchup}}',
on_failure_callback='email',
)
load_table = SnowflakeOperator(
task_id='load_table',
sql=load_sql,
snowflake_conn_id=CONN_ID,
autocommit=True,
dag=dag,
)
load_table
return dag
# ======== DAG DEFINITIONS #
edw_table_A = get_db_dag(
dag_id='edw_table_A',
start_date=datetime.datetime(2020, 5, 21),
schedule_interval='0 5 * * *',
max_taskrun=3, # Minutes
max_dagrun=1, # Hours
load_sql='extract_A.sql',
)
edw_table_B = get_db_dag(
dag_id='edw_table_B',
start_date=datetime.datetime(2020, 5, 21),
schedule_interval='0 5 * * *',
max_taskrun=3, # Minutes
max_dagrun=1, # Hours
load_sql='extract_B.sql',
)
Upvotes: 1
Views: 534
Reputation: 11607
For one since you are generating different DAG
s for different tables already, adding table-name to task_id
(also) is not required.
But of course if you want to do it, you could do away with simple python
string concatenation by adding a table_name
param in your get_db_dag(..)
function
def get_db_dag(
*, # what is this?
table_name, # replace dag_id param with just table_name param
start_date,
schedule_interval,
max_taskrun,
max_dagrun,
proc_nm # remove load_sql param too (it is also redundant)
):
..
dag = DAG(
dag_id=f"edw_table_{table_name}", # python 3+ string-interpolation
schedule_interval=schedule_interval,
..
)
load_table = SnowflakeOperator(
task_id=f"load_table_{table_name}", # python 3+ string-interpolation
sql=f"extract_{table_name}.sql",
snowflake_conn_id=CONN_ID,
..
)
load_table # what is this meant for? (it is redundant)
return dag
Then you can call the above function as
edw_table_A = get_db_dag(
table_name='A',
start_date=datetime.datetime(2020, 5, 21),
schedule_interval='0 5 * * *',
max_taskrun=3, # Minutes
max_dagrun=1, # Hours
)
Upvotes: 1