Reputation: 1709
I've noticed that in Airflow 1 when a task fails, the following task has upstream_failed
status, but the task which follows it next has skipped
status in case of ALL_SUCCESS
and ONE_SUCCESS
trigger rules.
This worked fine for a handler task with NONE_SKIPPED
trigger rule, what should be executed only when none of its preceding tasks has been skipped (so they could have succces
, failed
or upstream_failed
status), and it was skipped when any of its preceding tasks was skipped
.
This was good because tasks with upstream_failed
status were followed by skipped
tasks.
In Airflow 2 the situation is different, because the same trigger rule configuration works differently (and I don't understand this change)
The handler gets executed in each case, because the failed
status is always followed by upstream_failed
status and the upstream_failed
status is never followed by skipped
status.
This is a test code what I was playing with:
import datetime
from datetime import timedelta
from airflow.models import DAG
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
def fail_this_task():
raise Exception("I am angry at Airflow, so I deliberately fail.")
dag_args = {
'start_date': datetime.datetime.combine(datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()),
'email_on_failure': False,
'email_on_retry': False,
'catchup' : False,
'retries': 0,
}
with DAG(
default_args = dag_args,
dag_id = 'my_annoying_dag',
schedule_interval = None
) as dag:
FailDeliberately = PythonOperator(
task_id = 'fail_deliberately',
python_callable = fail_this_task
)
AlwaysUpstreamFailed = DummyOperator(
task_id = 'always_upstream_failed',
trigger_rule = TriggerRule.ALL_SUCCESS
)
VersionDependent = DummyOperator(
task_id = 'skipped_in_v1_but_upstream_failed_in_v2',
trigger_rule = TriggerRule.ONE_SUCCESS
)
FailHandler = DummyOperator(
task_id = 'this_handler_should_be_skipped',
trigger_rule = TriggerRule.NONE_SKIPPED
)
FailDeliberately >> AlwaysUpstreamFailed >> VersionDependent >> FailHandler
My original solution is more complex, the handler has more inputs, even from the task which is always in upstream_failed
state, and from the task which follows the task which has different status across the two Airflow versions etc. - so it's not that easy to refactor.
Upvotes: 1
Views: 516
Reputation: 16119
I think you are referring to PR which changed the behavior of ONE_SUCCESS
which marks the task as UPSTREAM_FAILED
when no upstreams succeed.
The previous behavior as you described is actually a bug. When a task if failing due to upstream task it means that something is not right. On the other hand tasks being skipped is considered a expected behavior (Branching operators are an example for that). This is also why the PR was targeted to Airflow 2.1 - It's not a breaking change, this is a bug fix.
Upvotes: 1