Reputation: 567
I am using trigger rule = "all_sucess" in my workflow for the scenario that all parent tasks get succeeded then only the next tasks get triggered. But the same is not happening.
Job Flow :
Task 1 >> Task 2a, Task 2b , Task 2c (run in parallel) >> Task c
Scenario 1 - One of the task 2a goes into up_for_retry , even then Task c gets executed
Scenario 2 - One of the task 2a is in running status , still Task c gets executed
Note - At all places we have trigger_rule as all_success
In ideal situation Task C should not be triggered until all tasks 2a, 2b, 2c gets successfully completed.
query_template_dict = {
'partner_list' = ['val1', 'val2']
'google_project': 'project_name',
'queries': {
'layer3': {
'template': 'temp.sql'
}
},
'applicable_tasks': {
'val1': {
'table_layer3': ['activity']
},
'val2': {
'table_layer3': ['activity'],
}
}
}
for partner in query_template_dict['partner_list']:
# Loop over applicable report queries for a partner
applicable_tasks = query_template_dict['applicable_tasks'][partner].keys()
for task in applicable_tasks:
query_params=[
{
"name": "col1",
"parameterType": { "type": "STRING" },
"parameterValue": { "value": col1}
}
]
run_bq_cmd = BigQueryOperator (
task_id =partner + '-' + task
trigger_rule ='all_success',
allow_large_results =True,
dag=dag
)
# Creating dependency on previous tasks
run_dummy >> run_bq_cmd
for sub_tasks in query_template_dict['applicable_tasks'][partner][task]:
run_sub_task = BashOperator(task_id = partner+ '_' + task + '_' + sub_tasks,
bash_command = bash_command,
trigger_rule= 'all_success',
dag = dag
)
run_bq_cmd >> run_sub_task
bash_command = <some bash command>
end_task = BashOperator( task_id = 'end_task',
bash_command= bash_command,
trigger_rule= 'all_success',
dag= dag
)
# Creating dependency on previous tasks
run_sub_task >> end_task
End task gets called even if multiple parent tasks(run sub task ) have not finished .
Can anybody help on this
Upvotes: 0
Views: 728
Reputation: 1758
Have you checked in the Airlfow UI if you can see the sub tasks and the end task correctly linked from a graphical point of view?
I think you are not setting the dependencies well, and it seems you instantiate end_task
in the same loop where you instantiate its parent tasks. Try to do something like:
for partner in query_template_dict['partner_list']:
# your previous code ...
# ...
# ...
bash_command = <some bash command>
end_task = BashOperator( task_id = 'end_task',
bash_command= bash_command,
trigger_rule= 'all_success',
dag= dag
)
for sub_tasks in query_template_dict['applicable_tasks'][partner][task]:
run_sub_task = BashOperator(task_id = partner+ '_' + task + '_' + sub_tasks,
bash_command = bash_command,
trigger_rule= 'all_success',
dag = dag
)
# Create dependency on previous and next tasks
run_bq_cmd >> run_sub_task >> end_task
Upvotes: 1