Reputation: 43
I have parallel execution of 2 tasks below in my DAG In the real world these could be 15 or 20 tasks with the input parameters coming from an array, like below.
fruits = ["apples", "bananas"]
bad_dag = DAG('bad_dag_3', default_args=default_args, schedule_interval=None)
t0=BashOperator(
task_id="print",
bash_command='echo "Beginning parallel tasks next..." ',
dag=bad_dag)
t1=BashOperator(
task_id="fruit_"+fruits[0],
params={"fruits": fruits},
bash_command='echo fruit= {{params.fruits[0]}} ',
dag=bad_dag)
t2=BashOperator(
task_id="fruit_"+fruits[1],
params={"fruits": fruits},
bash_command='echo fruit= {{params.fruits[1]}} ',
dag=bad_dag)
t0>>[t1, t2]
Whats the best way for me to write this DAG, so I dont have to re-write the same BashOperator over and over again like I have above.
I cannot use a loop because I cannot parallelize the tasks if I use a loop.
Upvotes: 2
Views: 1685
Reputation: 18884
Use the below DAG. The idea is that the task_id
for each task should be unique, airflow will handle the rest.
fruits = ["apples", "bananas"]
bad_dag = DAG('bad_dag_3', default_args=default_args, schedule_interval=None)
t0=BashOperator(
task_id="print",
bash_command='echo "Beginning parallel tasks next..." ',
dag=bad_dag)
for fruit in fruits:
task_t = BashOperator(
task_id="fruit_"+fruit,
params={"fruit": fruit},
bash_command='echo fruit= {{params.fruit}} ',
dag=bad_dag)
t0 >> task_t
Upvotes: 2