Reputation: 2098
I have three tasks, 1. AddEMRStep 2. Sensor 3. SQLstep. I just want it to be created for two environments.
with dag:
run_this_task = PythonOperator(
task_id = "run_this",
python_callable=push_to_xcom,
provide_context=True,
retries=10,
retry_delay=timedelta(seconds=1)
)
run_this_task2 = PythonOperator(
task_id = "run_this2",
python_callable=run_this_func,
provide_context=True
)
run_this_task >> run_this_task2
Now I need to create these dags for multiple environments
I am trying to do soemthing like this
envs = ["stg","prod"]
How can i use a for loop to make it like this
with dag:
run_this_task_stg = PythonOperator(
task_id = "run_this_task_stg",
python_callable=push_to_xcom,
provide_context=True,
retries=10,
retry_delay=timedelta(seconds=1)
)
run_this_task2_stg = PythonOperator(
task_id = "run_this_task2_stg",
python_callable=run_this_func,
provide_context=True
)
run_this_task_prod = PythonOperator(
task_id = "run_this_task_prod",
python_callable=push_to_xcom,
provide_context=True,
retries=10,
retry_delay=timedelta(seconds=1)
)
run_this_task2_prod = PythonOperator(
task_id = "run_this_task2_prod",
python_callable=run_this_func,
provide_context=True
)
start >> run_this_task_stg >> run_this_task2_stg
start >> run_this_task_prod >> run_this_task2_prod
Upvotes: 1
Views: 1322
Reputation: 3589
Absolutely! Try something like this:
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
def push_to_xcom():
...
def run_this_func():
...
dag = DAG(dag_id="loops", start_date=datetime(2022, 1, 1), schedule_interval=None)
envs = ["stg", "prod"]
with dag:
start = DummyOperator(task_id="start")
for env in envs:
run_this_task = PythonOperator(
task_id = f"run_this_task_{env}",
python_callable=push_to_xcom,
retries=10,
retry_delay=timedelta(seconds=1)
)
run_this_task2 = PythonOperator(
task_id = f"run_this_task2_{env}",
python_callable=run_this_func,
)
start >> run_this_task >> run_this_task2
Or with the TaskFlow API:
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.decorators import task
from airflow.operators.dummy import DummyOperator
dag = DAG(dag_id="loops", start_date=datetime(2022, 1, 1), schedule_interval=None)
envs = ["stg", "prod"]
with dag:
start = DummyOperator(task_id="start")
for env in envs:
@task(task_id=f"run_this_task_{env}", retries=10, retry_delay=timedelta(seconds=1))
def push_to_xcom():
...
@task(task_id=f"run_this_task2_{env}")
def run_this_func():
...
start >> push_to_xcom() >> run_this_func()
Upvotes: 2