Reputation: 11
I have an Airflow branching something like this
Branch >> A1 >> A2 >> A3 >> A4 >> A5 >> A6 >> A7 >> A8 >> A9 >> END
Branch >> B1 >> END
Now when Branch B1 is executed, everything in A is skipped. But for skipping it scans every layer 1 by 1. In this case it will scan A1 and then skip it, then scan A2 and then skip it and so on.
With the task delays, this sequential skipping takes very long when I have multiple layers.
Wanted to see if there is a way to skip all tasks in 1 shot and directly go to the END. Thanks,
Upvotes: 1
Views: 3690
Reputation: 4873
I think that ShortCircuitOperator may help you:
Allows a workflow to continue only if a condition is met. Otherwise, the workflow "short-circuits" and downstream tasks are skipped. The ShortCircuitOperator is derived from the PythonOperator. It evaluates a condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with a state of "skipped". If the condition is True, downstream tasks proceed as normal. The condition is determined by the result of python_callable.
You can find the following example included in the example_dags
within Airflow distribution:
"""Example DAG demonstrating the usage of the ShortCircuitOperator."""
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import ShortCircuitOperator
from airflow.utils import dates
args = {
'owner': 'airflow',
}
with DAG(
dag_id='example_short_circuit_operator',
default_args=args,
start_date=dates.days_ago(2),
tags=['example'],
) as dag:
cond_true = ShortCircuitOperator(
task_id='condition_is_True',
python_callable=lambda: True,
)
cond_false = ShortCircuitOperator(
task_id='condition_is_False',
python_callable=lambda: False,
)
ds_true = [DummyOperator(task_id='true_' + str(i)) for i in [1, 2]]
ds_false = [DummyOperator(task_id='false_' + str(i)) for i in [1, 2]]
chain(cond_true, *ds_true)
chain(cond_false, *ds_false)
If you change the number of tasks downstream of condition_is_False
, to 10, you will notice that all of them get executed at the same time:
ds_false = [DummyOperator(task_id="false_" + str(i)) for i in range(1, 20)]
From Gantt view:
Upvotes: 2