Reputation: 81
Let’s say this is my dag: A >> B >> C
If task B raises an exception, I want to skip the task instead of failing it. However, I don’t want to skip task C. I looked into AirflowSkipException and the soft_fail sensor but they both forcibly skip downstream tasks as well. Does anyone have a way to make this work?
Thanks!
Upvotes: 8
Views: 15265
Reputation: 476
Currently posted answers touch on different topic or does not seem to be fully correct.
Adding trigger rule all_failed
to Task-C won't work for OP's example DAG: A >> B >> C
unless Task-A ends in failed
state, which most probably is not desirable.
OP was, in fact, very close because expected behavior can be achieved with mix of AirflowSkipException
and none_failed
trigger rule:
from datetime import datetime
from airflow.exceptions import AirflowSkipException
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
with DAG(
dag_id="mydag",
start_date=datetime(2022, 1, 18),
schedule_interval="@once"
) as dag:
def task_b():
raise AirflowSkipException
A = DummyOperator(task_id="A")
B = PythonOperator(task_id="B", python_callable=task_b)
C = DummyOperator(task_id="C", trigger_rule="none_failed")
A >> B >> C
which Airflow executes as follows:
What this rule mean?
Trigger Rules
none_failed: All upstream tasks have not failed or upstream_failed - that is, all upstream tasks have succeeded or been skipped
So basically we can catch the actual exception in our code and raise mentioned Airflow exception which "force" task state change from failed
to skipped
.
However, without the trigger_rule
argument to Task-C we would end up with Task-B downstream marked as skipped
.
Upvotes: 10
Reputation: 591
You can refer to the Airflow documentation on trigger_rule.
trigger_rule
allows you to configure the task's execution dependency. Generally, a task is executed when all upstream tasks succeed. You can change that to other trigger rules provided in Airflow. The all_failed
trigger rule only executes a task when all upstream tasks fail, which would accomplish what you outlined.
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
with DAG(
dag_id="my_dag",
start_date=datetime(2021, 4, 5),
schedule_interval='@once',
) as dag:
p = PythonOperator(
task_id='fail_task',
python_callable=lambda x: 1,
)
t = PythonOperator(
task_id='run_task',
python_callable=lambda: 1,
trigger_rule=TriggerRule.ALL_FAILED
)
p >> t
Upvotes: 2
Reputation: 1051
You can change the trigger_rule
in your task declaration.
task = BashOperator(
task_id="task_C",
bash_command="echo hello world",
trigger_rule="all_done",
dag=dag
)
Upvotes: 2