elaspog
elaspog

Reputation: 1709

Behavior change in 'skipped' status propagation between Airflow v1 and v2?

I've noticed that in Airflow 1 when a task fails, the following task has upstream_failed status, but the task which follows it next has skipped status in case of ALL_SUCCESS and ONE_SUCCESS trigger rules.

This worked fine for a handler task with NONE_SKIPPED trigger rule, what should be executed only when none of its preceding tasks has been skipped (so they could have succces, failed or upstream_failed status), and it was skipped when any of its preceding tasks was skipped. This was good because tasks with upstream_failed status were followed by skipped tasks.

Airflow 1: enter image description here

In Airflow 2 the situation is different, because the same trigger rule configuration works differently (and I don't understand this change)

The handler gets executed in each case, because the failed status is always followed by upstream_failed status and the upstream_failed status is never followed by skipped status.

Airflow 2: enter image description here

This is a test code what I was playing with:

import datetime
from datetime import timedelta

from airflow.models import DAG
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator


def fail_this_task():
    raise Exception("I am angry at Airflow, so I deliberately fail.")

dag_args = {
    'start_date': datetime.datetime.combine(datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()),
    'email_on_failure': False,
    'email_on_retry': False,
    'catchup' : False,
    'retries': 0,
}

with DAG(
    default_args        = dag_args,
    dag_id              = 'my_annoying_dag',
    schedule_interval   = None
) as dag:

    FailDeliberately = PythonOperator(
        task_id         = 'fail_deliberately',
        python_callable = fail_this_task
    )

    AlwaysUpstreamFailed = DummyOperator(
        task_id         = 'always_upstream_failed',
        trigger_rule    = TriggerRule.ALL_SUCCESS
    )

    VersionDependent = DummyOperator(
        task_id         = 'skipped_in_v1_but_upstream_failed_in_v2',
        trigger_rule    = TriggerRule.ONE_SUCCESS
    )

    FailHandler = DummyOperator(
        task_id         = 'this_handler_should_be_skipped',
        trigger_rule    = TriggerRule.NONE_SKIPPED
    )
    
    FailDeliberately >> AlwaysUpstreamFailed >> VersionDependent >> FailHandler

My original solution is more complex, the handler has more inputs, even from the task which is always in upstream_failed state, and from the task which follows the task which has different status across the two Airflow versions etc. - so it's not that easy to refactor.

Upvotes: 1

Views: 516

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 16119

I think you are referring to PR which changed the behavior of ONE_SUCCESS which marks the task as UPSTREAM_FAILED when no upstreams succeed.

The previous behavior as you described is actually a bug. When a task if failing due to upstream task it means that something is not right. On the other hand tasks being skipped is considered a expected behavior (Branching operators are an example for that). This is also why the PR was targeted to Airflow 2.1 - It's not a breaking change, this is a bug fix.

Upvotes: 1

Related Questions