Prateek S
Prateek S

Reputation: 81

after a loop how to give task dependencies for some independent static tasks. Airflow

I have a for loop and I have some intermediate task and some task after the loop as well.

I am giving task dependency in for loop only as mentioned in so many posts:

Example :

individual_task1 = SSHOperator (task_id='tk_one'....)

individual_task2 = SSHOperator (task_id='tk_two'....)

individual_task3 = SSHOperator (task_id='tk_three'....)

  for i in [val1,val2,val3,val4.....valn]

    first_task_in_loop = SSHSparkSubmitOperator (task_id='comp_' + i,...)

    second_task_in_loop = SSHOperator(task_id='stats_' + i...)

    individual_task1 >> first_task_in_loop >> second_task_in_loop >> individual_task2 >> individual_task3

but for individual_task2 and individual_task3 i get error:

Broken Dag ,, task_id already registered .

But that is an individual task not defined in loop then why i am getting this error or i am doing anything wrong?

Upvotes: 0

Views: 647

Answers (1)

lealvcon
lealvcon

Reputation: 158

try this:

individual_task1 = SSHOperator (task_id='tk_one'....)

individual_task2 = SSHOperator (task_id='tk_two'....)

individual_task3 = SSHOperator (task_id='tk_three'....)

  for i in [val1,val2,val3,val4.....valn]

    first_task_in_loop = SSHSparkSubmitOperator (task_id='comp_' + i,...)

    second_task_in_loop = SSHOperator(task_id='stats_' + i...)

    individual_task1 >> first_task_in_loop >> second_task_in_loop >> individual_task2

individual_task2 >> individual_task3

maybe Airflow is complaining because you are setting the same task flow many times

Upvotes: 1

Related Questions