Reputation: 181
When generating tasks dynamically, I need to have Task 2 be dependent of Task 1, Task1 >> Task 2 or task2.set_upstream(task1).
Since the task_ids are evaluated, or seem to be upfront, I cannot set the dependency in advance, any help would be appreciated.
The Component(I) tasks generate fine, except that they all run at once.
for i in range(1,10):
task_id='Component'+str(i)
task_id = BashOperator(
task_id='Component'+str(i),
bash_command="echo {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i) ,
xcom_push=True,
dag=dag)
?????.set_upstream(??????)
Upvotes: 17
Views: 19676
Reputation: 18894
For Airflow>=2.3
You can use Dynamic Task Mapping feature where Dynamic Tasks are natively supported
BashOperator.partial(task_id="Component", do_xcom_push=True).expand(
bash_command=[
"echo {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i)
for i in range(0, 10)
]
)
For Airflow<2.3
Use the following code:
a = []
for i in range(0,10):
a.append(BashOperator(
task_id='Component'+str(i),
bash_command="echo {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i) ,
xcom_push=True,
dag=dag))
if i not in [0]:
a[i-1] >> a[i]
Using a DummyOperator
, the codes looks like:
a = []
for i in range(0,10):
a.append(DummyOperator(
task_id='Component'+str(i),
dag=dag))
if i not in [0]:
a[i-1] >> a[i]
This would generate the following DAG:
Upvotes: 30
Reputation: 2426
Use chain
from airflow.utils.helpers import chain
ops = []
for i in range(0,10):
ops.append(DummyOperator(
task_id=f"Component_{i}",
dag=dag))
chain(ops)
Upvotes: 0
Reputation: 1381
You can follow a pattern like this:
with dag:
d1 = DummyOperator(task_id='kick_off_dag')
for i in range(0, 5):
d2 = DummyOperator(task_id='generate_data_{0}'.format(i))
d1 >> d2
This will generate 5 tasks downstream from d1.
Upvotes: 9