Lidor Eliyahu Shelef
Lidor Eliyahu Shelef

Reputation: 1332

Airflow PythonOperator - decide which task will be next - based on the status of previous tasks

I'm not sure if this question should be here/on meta/exchange (let me know and I'll move it)

I was wondering if there is a way to make DAG (using python) in airflow using the PythonOperator (or other operator that is suitable to airflow 1.0 on Docker) and in the python_callable decide which task will be called next, it will be based on the previous tasks status.

I can already get all of the statuses of all of the tasks, so the thing I'm wondering is how to do that the based on the back tasks statuses it will decide to which task to go next.

I've searched the web for some time now and I didn't find anything helpful too much.

I'm using airflow version of 1.1 on Docker

I dont have a code example because that's what I'm trying to figure out, but let's say this is the basic idea:

TaskA >> TaskB
if TaskB >= -1 Go to TaskNegative
if TaskB == 0 go to TaskZero
if TaskB <= 1 go to TaskPositive
if TaskB == fail/null go to TaskFail
if TaskA == fail/null go to TaskC (TaskC will do the same as A did with B but with TaskD for example)

That is a very basic idea of what I'll need to implement (will be much more options, more condition etc...) which everything I can do using python code logic, but I'll need a way to supply that into an operator of some sort (again that works on airflow 1.0 on Docker)

Upvotes: 2

Views: 1395

Answers (2)

Hussein Awala
Hussein Awala

Reputation: 5110

The best solution is using BranchPythonOperator as mentioned in the other answer, I just tested a dag in Airflow 1.10.15 and it works fine:

from datetime import datetime, timedelta
from random import choice

from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator


def t2_process(**context):
    random_choice = choice(["TaskNegative", "TaskZero", "TaskPositive", "fail"])
    print(random_choice)
    if random_choice == "fail":
        raise Exception()
    return random_choice


with DAG(
        "test_branch",
        start_date=datetime(2022, 8, 25),
) as dag:
    T1 = PythonOperator(
        task_id="T1",
        python_callable=lambda: print("T1"),
    )
    T2 = BranchPythonOperator(
        task_id="T2",
        python_callable=t2_process,
        retries=1,
        retry_delay=timedelta(minutes=1)
    )
    TaskNegative = PythonOperator(
        task_id="TaskNegative",
        python_callable=lambda: print("TaskNegative"),
    )
    TaskZero = PythonOperator(
        task_id="TaskZero",
        python_callable=lambda: print("TaskZero"),
    )
    TaskPositive = PythonOperator(
        task_id="TaskPositive",
        python_callable=lambda: print("TaskPositive"),
    )
    TaskFail = PythonOperator(
        task_id="TaskFail",
        python_callable=lambda: print("TaskFail"),
        trigger_rule="all_failed",
    )
    T1 >> T2 >> [TaskNegative, TaskZero, TaskPositive, TaskFail]

I simulated the 4 cases using random.choice, you can replace the callable by your processing methods. For the TaskFail, the T2 will retry one time after the first fail, and if it fails again, TaskFail will be triggered because of its trigger_rule.

Upvotes: 2

Daniel Huang
Daniel Huang

Reputation: 6558

I think the BranchPythonOperator is what you're looking for. Similar to the PythonOperator, it will take a python_callable, but instead it expects the return value to be one or more task IDs. Just make sure all the possible next tasks are set as downstream dependencies of this task. It will run the specified tasks, then skip the rest.

Upvotes: 2

Related Questions