Ashika Umanga Umagiliya
Ashika Umanga Umagiliya

Reputation: 9158

Airflow : Skip a task using Branching

In my DAG wanted to skip a Task (oracle_merge_hist_orig) depend on a flag.

My logic is:

when oracle_branch=True execute [merge_op,update_table_op,table_count_op]

when oracle_branch=False execute [update_table_op, table_count_op]

I tried to use BranchPythonOperator as follows:

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}
oracle_branch = True
def branch_func():
    if oracle_branch:
        return "oracle_branch"
    else:
        return "normal_branch"

dag = DAG(
    dag_id='example_branch_operator',
    default_args=args,
    schedule_interval="@daily",
)

branching_op = BranchPythonOperator(
    task_id='branch_shall_run_oracle_merge_original_hist',
    python_callable=branch_func,
    dag= dag)

oracle_branch = DummyOperator(
    task_id='oracle_branch',
    dag=dag)

normal_branch = DummyOperator(
    task_id='normal_branch',
    dag=dag)

merge_op = DummyOperator(
    task_id='oracle_merge_hist_orig',
    dag=dag,
)

update_table_op = DummyOperator(
    task_id='update_table_job',
    dag=dag,
)

table_count_op = DummyOperator(
    task_id='table_count',
    dag=dag,
)

branching_op >> [oracle_branch,normal_branch] 
normal_branch >> update_table_op >> table_count_op
oracle_branch >> merge_op >> update_table_op >> table_count_op

However ,instead of skipping the Task it skips the entire path.

How to fix this so that I only skip the "racle_merge_hist_orig" task ?

When oracle_branch=False enter image description here

when oracle_branch=True enter image description here

Upvotes: 3

Views: 5101

Answers (1)

Sai Neelakantam
Sai Neelakantam

Reputation: 939

Every task will have a trigger_rule which is set to all_success by default. We can override it to different values that are listed here.

In your DAG, the update_table_job task has two upstream tasks. Since one of its upstream task is in skipped state, it also went into skipped state. We can avoid this by overriding the default value of trigger_rule to one_success like below.

update_table_op = DummyOperator(
    task_id='update_table_job',
    trigger_rule='one_success',
    dag=dag
)

enter image description here Note: I tested this on Airflow 1.10.4 version.

Upvotes: 11

Related Questions