Reputation: 1332
I'm not sure if this question should be here/on meta/exchange (let me know and I'll move it)
I was wondering if there is a way to make DAG (using python) in airflow using the PythonOperator
(or other operator that is suitable to airflow 1.0 on Docker) and in the python_callable
decide which task will be called next, it will be based on the previous tasks status.
I can already get all of the statuses of all of the tasks, so the thing I'm wondering is how to do that the based on the back tasks statuses it will decide to which task to go next.
I've searched the web for some time now and I didn't find anything helpful too much.
I'm using airflow version of 1.1 on Docker
I dont have a code example because that's what I'm trying to figure out, but let's say this is the basic idea:
TaskA >> TaskB
if TaskB >= -1 Go to TaskNegative
if TaskB == 0 go to TaskZero
if TaskB <= 1 go to TaskPositive
if TaskB == fail/null go to TaskFail
if TaskA == fail/null go to TaskC (TaskC will do the same as A did with B but with TaskD for example)
That is a very basic idea of what I'll need to implement (will be much more options, more condition etc...) which everything I can do using python code logic, but I'll need a way to supply that into an operator of some sort (again that works on airflow 1.0 on Docker)
Upvotes: 2
Views: 1395
Reputation: 5110
The best solution is using BranchPythonOperator
as mentioned in the other answer, I just tested a dag in Airflow 1.10.15
and it works fine:
from datetime import datetime, timedelta
from random import choice
from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator
def t2_process(**context):
random_choice = choice(["TaskNegative", "TaskZero", "TaskPositive", "fail"])
print(random_choice)
if random_choice == "fail":
raise Exception()
return random_choice
with DAG(
"test_branch",
start_date=datetime(2022, 8, 25),
) as dag:
T1 = PythonOperator(
task_id="T1",
python_callable=lambda: print("T1"),
)
T2 = BranchPythonOperator(
task_id="T2",
python_callable=t2_process,
retries=1,
retry_delay=timedelta(minutes=1)
)
TaskNegative = PythonOperator(
task_id="TaskNegative",
python_callable=lambda: print("TaskNegative"),
)
TaskZero = PythonOperator(
task_id="TaskZero",
python_callable=lambda: print("TaskZero"),
)
TaskPositive = PythonOperator(
task_id="TaskPositive",
python_callable=lambda: print("TaskPositive"),
)
TaskFail = PythonOperator(
task_id="TaskFail",
python_callable=lambda: print("TaskFail"),
trigger_rule="all_failed",
)
T1 >> T2 >> [TaskNegative, TaskZero, TaskPositive, TaskFail]
I simulated the 4 cases using random.choice
, you can replace the callable by your processing methods. For the TaskFail
, the T2
will retry one time after the first fail, and if it fails again, TaskFail
will be triggered because of its trigger_rule
.
Upvotes: 2
Reputation: 6558
I think the BranchPythonOperator is what you're looking for. Similar to the PythonOperator
, it will take a python_callable
, but instead it expects the return value to be one or more task IDs. Just make sure all the possible next tasks are set as downstream dependencies of this task. It will run the specified tasks, then skip the rest.
Upvotes: 2