Sheeeeesh
Sheeeeesh

Reputation: 81

How to skip a task in airflow without skipping its downstream tasks?

Let’s say this is my dag: A >> B >> C

If task B raises an exception, I want to skip the task instead of failing it. However, I don’t want to skip task C. I looked into AirflowSkipException and the soft_fail sensor but they both forcibly skip downstream tasks as well. Does anyone have a way to make this work?

Thanks!

Upvotes: 8

Views: 15265

Answers (3)

nervuzz
nervuzz

Reputation: 476

Currently posted answers touch on different topic or does not seem to be fully correct.

Adding trigger rule all_failed to Task-C won't work for OP's example DAG: A >> B >> C unless Task-A ends in failed state, which most probably is not desirable.

OP was, in fact, very close because expected behavior can be achieved with mix of AirflowSkipException and none_failed trigger rule:

from datetime import datetime

from airflow.exceptions import AirflowSkipException
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator

with DAG(
    dag_id="mydag",
    start_date=datetime(2022, 1, 18),
    schedule_interval="@once"
) as dag:

    def task_b():
        raise AirflowSkipException

    A = DummyOperator(task_id="A")
    B = PythonOperator(task_id="B", python_callable=task_b)
    C = DummyOperator(task_id="C", trigger_rule="none_failed")

    A >> B >> C

which Airflow executes as follows:

enter image description here

What this rule mean?

Trigger Rules

none_failed: All upstream tasks have not failed or upstream_failed - that is, all upstream tasks have succeeded or been skipped

So basically we can catch the actual exception in our code and raise mentioned Airflow exception which "force" task state change from failed to skipped. However, without the trigger_rule argument to Task-C we would end up with Task-B downstream marked as skipped.

Upvotes: 10

Alan Ma
Alan Ma

Reputation: 591

You can refer to the Airflow documentation on trigger_rule.

trigger_rule allows you to configure the task's execution dependency. Generally, a task is executed when all upstream tasks succeed. You can change that to other trigger rules provided in Airflow. The all_failed trigger rule only executes a task when all upstream tasks fail, which would accomplish what you outlined.

from datetime import datetime

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule

with DAG(
        dag_id="my_dag",
        start_date=datetime(2021, 4, 5),
        schedule_interval='@once',
) as dag:
    p = PythonOperator(
        task_id='fail_task',
        python_callable=lambda x: 1,
    ) 
    
    t = PythonOperator(
        task_id='run_task',
        python_callable=lambda: 1,
        trigger_rule=TriggerRule.ALL_FAILED
    )
    
    p >> t

Upvotes: 2

knl
knl

Reputation: 1051

You can change the trigger_rule in your task declaration.

task = BashOperator(
    task_id="task_C",
    bash_command="echo hello world",
    trigger_rule="all_done",
    dag=dag
)

Upvotes: 2

Related Questions