Srinivas
Srinivas

Reputation: 2098

How can we create three tasks in airflow for multiple environments

I have three tasks, 1. AddEMRStep 2. Sensor 3. SQLstep. I just want it to be created for two environments.

with dag:
    run_this_task = PythonOperator(
        task_id = "run_this",
        python_callable=push_to_xcom,
        provide_context=True,
        retries=10,
        retry_delay=timedelta(seconds=1)
    )

    run_this_task2 = PythonOperator(
        task_id = "run_this2",
        python_callable=run_this_func,
        provide_context=True
    )

    run_this_task >> run_this_task2

Now I need to create these dags for multiple environments

I am trying to do soemthing like this

envs = ["stg","prod"]

How can i use a for loop to make it like this

with dag:
    run_this_task_stg = PythonOperator(
        task_id = "run_this_task_stg",
        python_callable=push_to_xcom,
        provide_context=True,
        retries=10,
        retry_delay=timedelta(seconds=1)
    )

    run_this_task2_stg = PythonOperator(
        task_id = "run_this_task2_stg",
        python_callable=run_this_func,
        provide_context=True
    )

    run_this_task_prod = PythonOperator(
        task_id = "run_this_task_prod",
        python_callable=push_to_xcom,
        provide_context=True,
        retries=10,
        retry_delay=timedelta(seconds=1)
    )

    run_this_task2_prod = PythonOperator(
        task_id = "run_this_task2_prod",
        python_callable=run_this_func,
        provide_context=True
    )

    start >> run_this_task_stg >> run_this_task2_stg 
    start >> run_this_task_prod >> run_this_task2_prod

Upvotes: 1

Views: 1322

Answers (1)

Josh Fell
Josh Fell

Reputation: 3589

Absolutely! Try something like this:

from datetime import datetime, timedelta

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator


def push_to_xcom():
    ...

def run_this_func():
    ...

dag = DAG(dag_id="loops", start_date=datetime(2022, 1, 1), schedule_interval=None)
envs = ["stg", "prod"]

with dag:
    start = DummyOperator(task_id="start")
    
    for env in envs:
        run_this_task = PythonOperator(
            task_id = f"run_this_task_{env}",
            python_callable=push_to_xcom,
            retries=10,
            retry_delay=timedelta(seconds=1)
        )

        run_this_task2 = PythonOperator(
            task_id = f"run_this_task2_{env}",
            python_callable=run_this_func,
        )

        start >> run_this_task >> run_this_task2

Graph View enter image description here

Or with the TaskFlow API:

from datetime import datetime, timedelta

from airflow.models import DAG
from airflow.decorators import task
from airflow.operators.dummy import DummyOperator


dag = DAG(dag_id="loops", start_date=datetime(2022, 1, 1), schedule_interval=None)
envs = ["stg", "prod"]

with dag:
    start = DummyOperator(task_id="start")

    for env in envs:

        @task(task_id=f"run_this_task_{env}", retries=10, retry_delay=timedelta(seconds=1))
        def push_to_xcom():
            ...

        @task(task_id=f"run_this_task2_{env}")
        def run_this_func():
            ...

        start >> push_to_xcom() >> run_this_func()

Upvotes: 2

Related Questions