Reputation: 543
I have the following operators as shown below. I am able to visually see the the graph representation looks correct. However, I am not able to get the functionality to work properly as in the paths will not keep going past either branch. Regardless of the date, neither path will keep going to task_05.
The DAG has two (2) paths:
(1) If it is the first of the month, task_01->test_step->task_02->task_05->task_06
(2) If it is not the first of the month, task_01->test_step->task_03->task_04->task_05->task_06
Problem: The functionality does not keep the DAG to complete all the way throug task_06.
Assuming the problems resides in the way I am using the bitwise operators.
Code
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from airflow.models import DAG
default_args = {
'owner': 'astro',
'depends_on_past': False,
'start_date': datetime(2020, 5, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
dag_id='astro_test',
default_args=default_args,
schedule_interval="0 10 * * *",
catchup=True,
max_active_runs=1
)
def dom_branch(ds_nodash, **kwargs):
if ds_nodash[-2:] == '01':
return "task_02"
else:
return "task_03"
with dag:
one = DummyOperator(task_id='task_01')
two = DummyOperator(task_id='task_02')
three = DummyOperator(task_id='task_03')
four = DummyOperator(task_id='task_04')
five = DummyOperator(task_id='task_05')
six = DummyOperator(task_id='task_06')
dom_operator = BranchPythonOperator(
task_id=f"test_step",
provide_context=True,
op_kwargs={'ds_nodash': '{{ ds_nodash }}'},
python_callable=dom_branch
)
one >> dom_operator >> [two, three]
two >> five
three >> four >> five
five >> six
Upvotes: 1
Views: 3434
Reputation: 5661
This is due that task_5
requires both upstreams to finish before it can start. Instead, you can add a trigger rule:
https://airflow.apache.org/docs/stable/concepts.html#trigger-rules
five = DummyOperator(
task_id='task_05',
trigger_rule=TriggerRule.ONE_SUCCESS,
)
Upvotes: 5