user2715877
user2715877

Reputation: 543

Airflow BranchPythonOperator - Continue After Branch

I have the following operators as shown below. I am able to visually see the the graph representation looks correct. However, I am not able to get the functionality to work properly as in the paths will not keep going past either branch. Regardless of the date, neither path will keep going to task_05.

The DAG has two (2) paths:

(1) If it is the first of the month, task_01->test_step->task_02->task_05->task_06

(2) If it is not the first of the month, task_01->test_step->task_03->task_04->task_05->task_06

Problem: The functionality does not keep the DAG to complete all the way throug task_06.

Assuming the problems resides in the way I am using the bitwise operators.

Code

from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from airflow.models import DAG


default_args = {
    'owner': 'astro',
    'depends_on_past': False,
    'start_date': datetime(2020, 5, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    dag_id='astro_test',
    default_args=default_args,
    schedule_interval="0 10 * * *",
    catchup=True,
    max_active_runs=1
)

def dom_branch(ds_nodash, **kwargs):

    if ds_nodash[-2:] == '01':
        return "task_02"
    else:
        return "task_03"

with dag:
    one = DummyOperator(task_id='task_01')
    two = DummyOperator(task_id='task_02')
    three = DummyOperator(task_id='task_03')
    four = DummyOperator(task_id='task_04')
    five = DummyOperator(task_id='task_05')
    six = DummyOperator(task_id='task_06')

    dom_operator = BranchPythonOperator(
        task_id=f"test_step",
        provide_context=True,
        op_kwargs={'ds_nodash': '{{ ds_nodash }}'},
        python_callable=dom_branch
    )

    one >> dom_operator >> [two, three] 
    two >> five
    three >> four >> five   
    five >> six

1st of month Screen shot of Graph View of DAG (1st of Month)

Not 1st of month Screen shot of Graph View of DAG (Not 1st of Month)

Upvotes: 1

Views: 3434

Answers (1)

drum
drum

Reputation: 5661

This is due that task_5 requires both upstreams to finish before it can start. Instead, you can add a trigger rule: https://airflow.apache.org/docs/stable/concepts.html#trigger-rules

five = DummyOperator(
    task_id='task_05',
    trigger_rule=TriggerRule.ONE_SUCCESS,
)

Upvotes: 5

Related Questions