Reputation: 506
I got the following DAG
import logging
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup
def select_next_branch():
if some_condition:
next_task_ = 'tasks.inner'
else:
next_task_ = 'end'
logging.info(f'next_task: {next_task_}')
return next_task_
with DAG(dag_id='poc_branch_tasks',
description='Branching with task group POC',
schedule_interval=None,
start_date=days_ago(1),
tags=['poc', 'branch', 'task_group']) as dag:
start = DummyOperator(task_id='start')
branch = BranchPythonOperator(task_id='branch',
python_callable=select_next_branch)
with TaskGroup(group_id='tasks') as task_group:
inner_one = DummyOperator(task_id='inner')
end = DummyOperator(task_id='end')
start >> branch
branch >> end
branch >> task_group >> end
When some_condition
is satisfied the flow go correctly from branch
to task_group.inner
, otherwise it should be from branch
to end
, but instead of execute end
this is skipped. What I'm doing wrong?
Thanks in advance.
Upvotes: 0
Views: 1645
Reputation: 9308
Try adding trigger_rule='one_success'
for end
task. The default trigger_rule
is all_success.
all_success (default): All upstream tasks have succeeded
However, your end
task is dependent for both Branch operator and inner
task. When inner
task is skipped, end
cannot triggered because one of the upstream task is not "success".
The trigger rule one_success will try to execute this end
task if either of Branch operator or inner
task is succeeded.
end = DummyOperator(task_id='end', trigger_rule='one_success')
Upvotes: 2