tristobal
tristobal

Reputation: 506

Problem with BranchPythonOperator to TaskGroup

I got the following DAG

import logging

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup


def select_next_branch():
    if some_condition:
        next_task_ = 'tasks.inner'
    else:
        next_task_ = 'end'
    logging.info(f'next_task: {next_task_}')
    return next_task_


with DAG(dag_id='poc_branch_tasks',
         description='Branching with task group POC',
         schedule_interval=None,
         start_date=days_ago(1),
         tags=['poc', 'branch', 'task_group']) as dag:

    start = DummyOperator(task_id='start')

    branch = BranchPythonOperator(task_id='branch',
                                  python_callable=select_next_branch)

    with TaskGroup(group_id='tasks') as task_group:
        inner_one = DummyOperator(task_id='inner')

    end = DummyOperator(task_id='end')

    start >> branch
    branch >> end
    branch >> task_group >> end

enter image description here

When some_condition is satisfied the flow go correctly from branch to task_group.inner, otherwise it should be from branch to end, but instead of execute end this is skipped. What I'm doing wrong?

enter image description here

Thanks in advance.

Upvotes: 0

Views: 1645

Answers (1)

Emma
Emma

Reputation: 9308

Try adding trigger_rule='one_success' for end task. The default trigger_rule is all_success.

all_success (default): All upstream tasks have succeeded

However, your end task is dependent for both Branch operator and inner task. When inner task is skipped, end cannot triggered because one of the upstream task is not "success".

The trigger rule one_success will try to execute this end task if either of Branch operator or inner task is succeeded.

end = DummyOperator(task_id='end', trigger_rule='one_success')

Upvotes: 2

Related Questions