fjjones88
fjjones88

Reputation: 349

Airflow Python Branch Operator not working in 1.10.15

My airflow test_dag looks like:

dag = DAG(
    dag_id='test_dag',
    default_args=some_args,
    catchup=False,
    schedule_interval='0 10 * * *'
)

dummy_step_one = PythonOperator(
    task_id='dummy_step_one',
    provide_context=True,
    python_callable=dummy_step_one,
    dag=dag,
    retries=5
)

branch_test = BranchPythonOperator(
    task_id='branch_test',
    provide_context=True,
    python_callable=branch_test,
    dag=dag,
    retries=3,
    retry_delay=timedelta(seconds=5)
)

dummy_step_two = PythonOperator(
    task_id='dummy_step_two',
    provide_context=True,
    python_callable=dummy_step_two,
    dag=dag,
    retries=5
)

dummy_step_three = PythonOperator(
    task_id='dummy_step_three',
    provide_context=True,
    python_callable=dummy_step_three,
    dag=dag,
    retries=5
)

dummy_step_four = PythonOperator(
    task_id='dummy_step_four',
    provide_context=True,
    python_callable=dummy_step_four,
    dag=dag,
    retries=5
)

dummy_step_one >> branch_test
branch_test >> dummy_step_two >> dummy_step_three >> dummy_step_four
branch_test >> dummy_step_four

I am importing each of these python callables from another file, and they look like:

def dummy_step_one(**context: dict) -> str:

    return print('hello world')

def dummy_step_two(**context: dict) -> str:

    return print('hello world')

def dummy_step_three(**context: dict) -> str:

    return print('hello world')

def dummy_step_four(**context: dict) -> str:

    return print('hello world')

def branch_test(**context: dict) -> str:

    return 'dummy_step_four'

So in theory, this should run step_one, then run branch_test, then skip step_two and step_three cause branch_test is returning dummy_step_four, and then finally run step_four. The graph view of this looks correct:

enter image description here

However, when this runs, step_four is skipped as shown in the graph view above and the tree view below. Why?? Ignore the first successful run:

enter image description here

Here is what is printed in the log in the branch_test step:

enter image description here

Important note: I was using Apache Airflow 1.10.12 and this was running successfully, but we recently upgraded to 1.10.15 in preparation for the upgrade to 2.0 -- so the issue I'm facing is likely related, but I can't find any documentation online that details a bug with the python branch operator in 1.10.15.

Upvotes: 1

Views: 1336

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 16119

I believe your code has a bug in the pipeline logic.

BranchPythonOperator is expected to return the task_id to follow. In your case you have:

def branch_test(**context: dict) -> str:

    return 'dummy_step_four'

which means that it will always follow to dummy_step_four and always skip dummy_step_two, however you also set:

dummy_step_two >> dummy_step_three >> dummy_step_four

This means that dummy_step_four upstream tasks are:

dummy_step_three in state SKIPPED

branch_test in state SUCCESS

The default trigger rule for tasks is all_success which means that for dummy_step_four the condition doesn't met as one of its parent is skipped thus dummy_step_four will also be skipped.

To fix the issue you need to change the trigger rule in task dummy_step_four.

For Airflow< 2.2.0 use:

trigger_rule="none_failed_or_skipped"

(Yes this is the right trigger rule, The name is misleading which is why this was renamed in PR.)

For Airflow >= 2.2.0 use:

trigger_rule="none_failed_min_one_success"

So the operator code needs to be:

dummy_step_four = PythonOperator(
    task_id='dummy_step_four',
    provide_context=True,
    python_callable=dummy_step_four,
    dag=dag,
    retries=5,
    trigger_rule="none_failed_min_one_success",
)

Example:

enter image description here

Noting: For your senario the all_done trigger rule will also work (if you prefer). I didn't use it in the example because the official docs also use none_failed_min_one_success for this exact scenario.

Upvotes: 2

Related Questions