user1967397
user1967397

Reputation: 181

Airflow Generate Dynamic Tasks in Single DAG , Task N+1 is Dependent on TaskN

When generating tasks dynamically, I need to have Task 2 be dependent of Task 1, Task1 >> Task 2 or task2.set_upstream(task1).

Since the task_ids are evaluated, or seem to be upfront, I cannot set the dependency in advance, any help would be appreciated.

The Component(I) tasks generate fine, except that they all run at once.

for i in range(1,10):
  task_id='Component'+str(i)
  task_id = BashOperator(
  task_id='Component'+str(i),
  bash_command="echo  {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i) ,
  xcom_push=True,
  dag=dag) 
  ?????.set_upstream(??????)

Upvotes: 17

Views: 19676

Answers (3)

kaxil
kaxil

Reputation: 18894

For Airflow>=2.3

You can use Dynamic Task Mapping feature where Dynamic Tasks are natively supported

BashOperator.partial(task_id="Component", do_xcom_push=True).expand(
    bash_command=[
        "echo  {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i)
        for i in range(0, 10)
    ]
)

For Airflow<2.3

Use the following code:

a = []
for i in range(0,10):
    a.append(BashOperator(
        task_id='Component'+str(i),
        bash_command="echo  {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i) ,
        xcom_push=True,
        dag=dag))
    if i not in [0]: 
        a[i-1] >> a[i]

Using a DummyOperator, the codes looks like:

a = []
for i in range(0,10):
    a.append(DummyOperator(
        task_id='Component'+str(i),
        dag=dag))
    if i not in [0]: 
        a[i-1] >> a[i]

This would generate the following DAG:

enter image description here

Upvotes: 30

LYu
LYu

Reputation: 2426

Use chain

from airflow.utils.helpers import chain

ops = []
for i in range(0,10):
    ops.append(DummyOperator(
        task_id=f"Component_{i}",
        dag=dag))

chain(ops)

Upvotes: 0

Viraj Parekh
Viraj Parekh

Reputation: 1381

You can follow a pattern like this:

with dag:

d1 = DummyOperator(task_id='kick_off_dag')

for i in range(0, 5):
    d2 = DummyOperator(task_id='generate_data_{0}'.format(i))
    d1 >> d2

This will generate 5 tasks downstream from d1.

Upvotes: 9

Related Questions