codninja0908
codninja0908

Reputation: 567

All_Success trigger rule is not working as expected

I am using trigger rule = "all_sucess" in my workflow for the scenario that all parent tasks get succeeded then only the next tasks get triggered. But the same is not happening.

Job Flow :

Task 1 >> Task 2a, Task 2b , Task 2c (run in parallel) >> Task c

Scenario 1 - One of the task 2a goes into up_for_retry , even then Task c gets executed

Scenario 2 - One of the task 2a is in running status , still Task c gets executed

Note - At all places we have trigger_rule as all_success

In ideal situation Task C should not be triggered until all tasks 2a, 2b, 2c gets successfully completed.

query_template_dict = {
    'partner_list' = ['val1', 'val2']
    'google_project': 'project_name',
    'queries': {
        'layer3': {
            'template':             'temp.sql'
        }
    },
    'applicable_tasks': {
        'val1': {
            'table_layer3': ['activity']
        },
        'val2': {
            'table_layer3': ['activity'],
        }

    }
}


for partner in query_template_dict['partner_list']:
    # Loop over applicable report queries for a partner
    applicable_tasks = query_template_dict['applicable_tasks'][partner].keys()
    for task in applicable_tasks:
                                              
        query_params=[
        {
                "name":                 "col1",
                "parameterType":        { "type": "STRING" },
                "parameterValue":       { "value": col1}
        }
        ]
        
        run_bq_cmd = BigQueryOperator (
                        task_id                                 =partner + '-' + task
                        trigger_rule                            ='all_success',
                        allow_large_results                     =True,
                        dag=dag
                        )
        # Creating dependency on previous tasks
        run_dummy >> run_bq_cmd
        
        for sub_tasks in query_template_dict['applicable_tasks'][partner][task]:
        
            run_sub_task = BashOperator(task_id = partner+ '_' + task + '_' + sub_tasks,
                                        bash_command = bash_command,
                                        trigger_rule= 'all_success',
                                        dag = dag
                                       )
            run_bq_cmd >> run_sub_task                           
            bash_command    = <some bash command>       
            end_task = BashOperator( task_id =      'end_task',
                                     bash_command=  bash_command,
                                     trigger_rule=  'all_success',
                                     dag=           dag
                                    )
            # Creating dependency on previous tasks
            run_sub_task >> end_task

End task gets called even if multiple parent tasks(run sub task ) have not finished .

Can anybody help on this

Upvotes: 0

Views: 728

Answers (1)

UJIN
UJIN

Reputation: 1758

Have you checked in the Airlfow UI if you can see the sub tasks and the end task correctly linked from a graphical point of view?

I think you are not setting the dependencies well, and it seems you instantiate end_task in the same loop where you instantiate its parent tasks. Try to do something like:

for partner in query_template_dict['partner_list']:
    
    # your previous code ...
    # ...
    # ...
    
    bash_command    = <some bash command>       
    end_task = BashOperator( task_id =      'end_task',
                             bash_command=  bash_command,
                             trigger_rule=  'all_success',
                             dag=           dag
                            )
    
    for sub_tasks in query_template_dict['applicable_tasks'][partner][task]:
    
        run_sub_task = BashOperator(task_id = partner+ '_' + task + '_' + sub_tasks,
                                    bash_command = bash_command,
                                    trigger_rule= 'all_success',
                                    dag = dag
                                   )
                       
        # Create dependency on previous and next tasks
        run_bq_cmd >> run_sub_task >> end_task

Upvotes: 1

Related Questions