Reputation: 349
My airflow test_dag looks like:
dag = DAG(
dag_id='test_dag',
default_args=some_args,
catchup=False,
schedule_interval='0 10 * * *'
)
dummy_step_one = PythonOperator(
task_id='dummy_step_one',
provide_context=True,
python_callable=dummy_step_one,
dag=dag,
retries=5
)
branch_test = BranchPythonOperator(
task_id='branch_test',
provide_context=True,
python_callable=branch_test,
dag=dag,
retries=3,
retry_delay=timedelta(seconds=5)
)
dummy_step_two = PythonOperator(
task_id='dummy_step_two',
provide_context=True,
python_callable=dummy_step_two,
dag=dag,
retries=5
)
dummy_step_three = PythonOperator(
task_id='dummy_step_three',
provide_context=True,
python_callable=dummy_step_three,
dag=dag,
retries=5
)
dummy_step_four = PythonOperator(
task_id='dummy_step_four',
provide_context=True,
python_callable=dummy_step_four,
dag=dag,
retries=5
)
dummy_step_one >> branch_test
branch_test >> dummy_step_two >> dummy_step_three >> dummy_step_four
branch_test >> dummy_step_four
I am importing each of these python callables from another file, and they look like:
def dummy_step_one(**context: dict) -> str:
return print('hello world')
def dummy_step_two(**context: dict) -> str:
return print('hello world')
def dummy_step_three(**context: dict) -> str:
return print('hello world')
def dummy_step_four(**context: dict) -> str:
return print('hello world')
def branch_test(**context: dict) -> str:
return 'dummy_step_four'
So in theory, this should run step_one, then run branch_test, then skip step_two and step_three cause branch_test is returning dummy_step_four
, and then finally run step_four. The graph view of this looks correct:
However, when this runs, step_four is skipped as shown in the graph view above and the tree view below. Why?? Ignore the first successful run:
Here is what is printed in the log in the branch_test step:
Important note: I was using Apache Airflow 1.10.12 and this was running successfully, but we recently upgraded to 1.10.15 in preparation for the upgrade to 2.0 -- so the issue I'm facing is likely related, but I can't find any documentation online that details a bug with the python branch operator in 1.10.15.
Upvotes: 1
Views: 1336
Reputation: 16119
I believe your code has a bug in the pipeline logic.
BranchPythonOperator
is expected to return the task_id to follow.
In your case you have:
def branch_test(**context: dict) -> str:
return 'dummy_step_four'
which means that it will always follow to dummy_step_four
and always skip dummy_step_two
, however you also set:
dummy_step_two >> dummy_step_three >> dummy_step_four
This means that dummy_step_four
upstream tasks are:
dummy_step_three
in state SKIPPED
branch_test
in state SUCCESS
The default trigger rule for tasks is all_success
which means that for dummy_step_four
the condition doesn't met as one of its parent is skipped thus dummy_step_four
will also be skipped.
To fix the issue you need to change the trigger rule in task dummy_step_four
.
For Airflow< 2.2.0
use:
trigger_rule="none_failed_or_skipped"
(Yes this is the right trigger rule, The name is misleading which is why this was renamed in PR.)
For Airflow >= 2.2.0
use:
trigger_rule="none_failed_min_one_success"
So the operator code needs to be:
dummy_step_four = PythonOperator(
task_id='dummy_step_four',
provide_context=True,
python_callable=dummy_step_four,
dag=dag,
retries=5,
trigger_rule="none_failed_min_one_success",
)
Example:
Noting: For your senario the all_done
trigger rule will also work (if you prefer). I didn't use it in the example because the official docs also use none_failed_min_one_success
for this exact scenario.
Upvotes: 2