Reputation: 81
I have a for loop and I have some intermediate task and some task after the loop as well.
I am giving task dependency in for loop only as mentioned in so many posts:
Example :
individual_task1 = SSHOperator (task_id='tk_one'....)
individual_task2 = SSHOperator (task_id='tk_two'....)
individual_task3 = SSHOperator (task_id='tk_three'....)
for i in [val1,val2,val3,val4.....valn]
first_task_in_loop = SSHSparkSubmitOperator (task_id='comp_' + i,...)
second_task_in_loop = SSHOperator(task_id='stats_' + i...)
individual_task1 >> first_task_in_loop >> second_task_in_loop >> individual_task2 >> individual_task3
but for individual_task2 and individual_task3 i get error:
Broken Dag ,, task_id already registered .
But that is an individual task not defined in loop then why i am getting this error or i am doing anything wrong?
Upvotes: 0
Views: 647
Reputation: 158
try this:
individual_task1 = SSHOperator (task_id='tk_one'....)
individual_task2 = SSHOperator (task_id='tk_two'....)
individual_task3 = SSHOperator (task_id='tk_three'....)
for i in [val1,val2,val3,val4.....valn]
first_task_in_loop = SSHSparkSubmitOperator (task_id='comp_' + i,...)
second_task_in_loop = SSHOperator(task_id='stats_' + i...)
individual_task1 >> first_task_in_loop >> second_task_in_loop >> individual_task2
individual_task2 >> individual_task3
maybe Airflow is complaining because you are setting the same task flow many times
Upvotes: 1