Kar
Kar

Reputation: 1014

How do I pass the task_id name dynamically in the Dynamic DAG creation

Below is the dynamic dag creation for each table. I need to pass Table name for the load_table, So the task can be seen as load_table_A in DAG edw_table_A and load_table_B in the DAG edw_table_B

    import datetime
    import os
    from functools import partial
    from datetime import timedelta
    from airflow.models import DAG,Variable
    from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
    from alerts.email_operator import dag_failure_email

    def get_db_dag(
        *,
        dag_id,
        start_date,
        schedule_interval,
        max_taskrun,
        max_dagrun,
        proc_nm,
        load_sql
    ):

        default_args = {
            'owner': 'airflow',
            'start_date': start_date,
            'provide_context': True,
            'execution_timeout': timedelta(minutes=max_taskrun),
            'email_on_retry': False,
        }


        dag = DAG(
            dag_id=dag_id,
            schedule_interval=schedule_interval,
            dagrun_timeout=timedelta(hours=max_dagrun),
            template_searchpath=tmpl_search_path,
            default_args=default_args,
            max_active_runs=1,
            catchup='{{var.value.dag_catchup}}',
            on_failure_callback='email',
        )


        load_table = SnowflakeOperator(
            task_id='load_table',
            sql=load_sql,
            snowflake_conn_id=CONN_ID,
            autocommit=True,
            dag=dag,
        )

        load_table

        return dag

    # ======== DAG DEFINITIONS #

    edw_table_A = get_db_dag(
        dag_id='edw_table_A',
        start_date=datetime.datetime(2020, 5, 21),
        schedule_interval='0 5 * * *',
        max_taskrun=3,  # Minutes
        max_dagrun=1,  # Hours
        load_sql='extract_A.sql',
    )

    edw_table_B = get_db_dag(
        dag_id='edw_table_B',
        start_date=datetime.datetime(2020, 5, 21),
        schedule_interval='0 5 * * *',
        max_taskrun=3,  # Minutes
        max_dagrun=1,  # Hours
        load_sql='extract_B.sql',
    )

Upvotes: 1

Views: 534

Answers (1)

y2k-shubham
y2k-shubham

Reputation: 11607

For one since you are generating different DAGs for different tables already, adding table-name to task_id (also) is not required.


But of course if you want to do it, you could do away with simple python string concatenation by adding a table_name param in your get_db_dag(..) function

def get_db_dag(
        *,                 # what is this?
        table_name,        # replace dag_id param with just table_name param
        start_date,
        schedule_interval,
        max_taskrun,
        max_dagrun,
        proc_nm            # remove load_sql param too (it is also redundant)
    ):

        ..


        dag = DAG(
            dag_id=f"edw_table_{table_name}",            # python 3+ string-interpolation
            schedule_interval=schedule_interval,
            ..
        )


        load_table = SnowflakeOperator(
            task_id=f"load_table_{table_name}",           # python 3+ string-interpolation
            sql=f"extract_{table_name}.sql",
            snowflake_conn_id=CONN_ID,
            ..
        )

        load_table                                        # what is this meant for? (it is redundant)

        return dag

Then you can call the above function as

edw_table_A = get_db_dag(
        table_name='A',
        start_date=datetime.datetime(2020, 5, 21),
        schedule_interval='0 5 * * *',
        max_taskrun=3,  # Minutes
        max_dagrun=1,  # Hours
    )

Upvotes: 1

Related Questions