Ryan
Ryan

Reputation: 1242

How to branch multiple paths in Airflow DAG using branch operator?

This is what I want, but I don't know how to achieve this in airflow, as both of the tasks are being executed.

enter image description here

To summarize:

How should I structure this? I have this as my current code:

(t1 >> t2 >> option_1 >> complete)
(t1 >> t2 >> option_2 >> do_x >> do_y >> complete)

t2 in this case is a branch operator.

I've also tried the syntax for ... [option_1, option_2] ... but I need a completely separate path to execute, not just a single task to be switched.

Upvotes: 5

Views: 14375

Answers (1)

Josh Fell
Josh Fell

Reputation: 3589

The dependencies you have in your code are correct for branching. Make sure BranchPythonOperator returns the task_id of the task at the start of the branch based on whatever logic you need. More info on the BranchPythonOperator here. One last important note is related to the "complete" task. Since branches converge on the "complete" task, make sure the trigger_rule is set to "none_failed" (you can also use the TriggerRule class constant as well) so the task doesn't get skipped.

Quick code test for your reference:

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule

from datetime import datetime


DEFAULT_ARGS = dict(
    start_date=datetime(2021, 5, 5),
    owner="airflow",
    retries=0,
)

DAG_ARGS = dict(
    dag_id="multi_branch",
    schedule_interval=None,
    default_args=DEFAULT_ARGS,
    catchup=False,
)


def random_branch():
    from random import randint

    return "option_1" if randint(1, 2) == 1 else "option_2"


with DAG(**DAG_ARGS) as dag:
    t1 = DummyOperator(task_id="t1")

    t2 = BranchPythonOperator(task_id="t2", python_callable=random_branch)

    option_1 = DummyOperator(task_id="option_1")

    option_2 = DummyOperator(task_id="option_2")

    do_x = DummyOperator(task_id="do_x")

    do_y = DummyOperator(task_id="do_y")

    complete = DummyOperator(task_id="complete", trigger_rule=TriggerRule.NONE_FAILED)

    t1 >> t2 >> option_1 >> complete
    t1 >> t2 >> option_2 >> do_x >> do_y >> complete

DAG with BranchPythonOperator

Upvotes: 11

Related Questions