Reputation: 503
The requirement is when Task A gets failed, I need to trigger Task D1. Similarly, when Task B gets failed, Task D2 needs to be triggered.
I have coded it as below. With this code, when Task-A gets failed, Task D1 and Task D2 both are getting triggered. How to solve this?
def do_op1_work(**kwargs):
x = kwargs['dag_run'].conf.get('x')
log.info('x: ' + str(x))
if x == 0:
raise ValueError('Manual Exception')
def do_op2_work(**kwargs):
y = kwargs['dag_run'].conf.get('y')
log.info('y: ' + str(y))
if y == 0:
raise ValueError('Manual Exception')
with DAG(dag_id='fulfill_uv', schedule_interval=None, default_args=default_args, catchup=False) as dag:
op1 = PythonOperator(task_id='A', python_callable=do_op1_work, provide_context=True)
op2 = PythonOperator(task_id='B', python_callable=do_op2_work, provide_context=True)
op3 = DummyOperator(task_id='C')
op4 = DummyOperator(task_id='D1', trigger_rule='all_failed')
op5 = DummyOperator(task_id='D2', trigger_rule='all_failed')
op6 = DummyOperator(task_id='E', trigger_rule='one_success')
op2.set_upstream(op1)
op3.set_upstream(op2)
op4.set_upstream(op1)
op5.set_upstream(op2)
op6.set_upstream(op3)
op6.set_upstream(op4)
op6.set_upstream(op5)
Upvotes: 1
Views: 661
Reputation: 5110
When task A fails, task B will have the status upstream_failed
which is considered as fail, in this case D2 will be executed.
To simplify the logic of your dag, and to bypass this problem, you can create two BranchPythonOperator
:
failed
or B if it is succeeded
failed
or C if it is succeeded
To fetch the state:
def get_state(task_id, **context):
return context["dag_run"].get_task_instance(task_id).state
For the branch operators:
from airflow.operators.python_operator import BranchPythonOperator
def branch_func(task_to_check, task_to_run_on_success, task_to_run_on_fail, **context):
task_state = get_state(task_to_check, **context)
if task_state == "succeeded":
return task_to_run_on_success
else:
return task_to_run_on_fail
branch1 = BranchPythonOperator(
task_id='branch_task_1',
provide_context=True,
python_callable=branch_func,
op_kwargs={'task_to_check': 'A', 'task_to_run_on_success': 'B', 'task_to_run_on_fail': 'D1'},
trigger_rule='all_done',
)
branch2 = BranchPythonOperator(
task_id='branch_task_2',
provide_context=True,
python_callable=branch_func,
op_kwargs={'task_to_check': 'B', 'task_to_run_on_success': 'C', 'task_to_run_on_fail': 'D2'},
trigger_rule='all_done',
)
And for the dependencies:
A >> branch_1 >> [B, D1]
B >> branch_2 >> [C, D2]
[C, D2, D1] >> E
For E, you use trigger_rule='one_success'
.
And for the other tasks, you keep the default value all_success
.
Upvotes: 1