karthik_varma_k
karthik_varma_k

Reputation: 503

How to create a multiple conditional tasks in Airflow?

enter image description here

The requirement is when Task A gets failed, I need to trigger Task D1. Similarly, when Task B gets failed, Task D2 needs to be triggered.

I have coded it as below. With this code, when Task-A gets failed, Task D1 and Task D2 both are getting triggered. How to solve this?

def do_op1_work(**kwargs):
    x = kwargs['dag_run'].conf.get('x')
    log.info('x: ' + str(x))
    if x == 0:
        raise ValueError('Manual Exception')


def do_op2_work(**kwargs):
    y = kwargs['dag_run'].conf.get('y')
    log.info('y: ' + str(y))
    if y == 0:
        raise ValueError('Manual Exception')


with DAG(dag_id='fulfill_uv', schedule_interval=None, default_args=default_args, catchup=False) as dag:
    op1 = PythonOperator(task_id='A', python_callable=do_op1_work, provide_context=True)
    op2 = PythonOperator(task_id='B', python_callable=do_op2_work, provide_context=True)
    op3 = DummyOperator(task_id='C')
    op4 = DummyOperator(task_id='D1', trigger_rule='all_failed')
    op5 = DummyOperator(task_id='D2', trigger_rule='all_failed')
    op6 = DummyOperator(task_id='E', trigger_rule='one_success')

    op2.set_upstream(op1)
    op3.set_upstream(op2)
    op4.set_upstream(op1)
    op5.set_upstream(op2)
    op6.set_upstream(op3)
    op6.set_upstream(op4)
    op6.set_upstream(op5)

Upvotes: 1

Views: 661

Answers (1)

Hussein Awala
Hussein Awala

Reputation: 5110

When task A fails, task B will have the status upstream_failed which is considered as fail, in this case D2 will be executed.

To simplify the logic of your dag, and to bypass this problem, you can create two BranchPythonOperator:

  1. One which fetch the state of the task A and runs D1 if it is failed or B if it is succeeded
  2. The second one fetch the state of the task B and runs D2 if it is failed or C if it is succeeded

To fetch the state:

def get_state(task_id, **context):
    return context["dag_run"].get_task_instance(task_id).state

For the branch operators:

from airflow.operators.python_operator import BranchPythonOperator

def branch_func(task_to_check, task_to_run_on_success, task_to_run_on_fail, **context):
    task_state = get_state(task_to_check, **context)
    if task_state == "succeeded":
        return task_to_run_on_success
    else:
        return task_to_run_on_fail

branch1 = BranchPythonOperator(
    task_id='branch_task_1',
    provide_context=True,
    python_callable=branch_func,
    op_kwargs={'task_to_check': 'A', 'task_to_run_on_success': 'B', 'task_to_run_on_fail': 'D1'},
    trigger_rule='all_done',
)

branch2 = BranchPythonOperator(
    task_id='branch_task_2',
    provide_context=True,
    python_callable=branch_func,
    op_kwargs={'task_to_check': 'B', 'task_to_run_on_success': 'C', 'task_to_run_on_fail': 'D2'},
    trigger_rule='all_done',
)

And for the dependencies:

A >> branch_1 >> [B, D1]
B >> branch_2 >> [C, D2]
[C, D2, D1] >> E

For E, you use trigger_rule='one_success'. And for the other tasks, you keep the default value all_success.

Upvotes: 1

Related Questions