Reputation: 9158
In my DAG wanted to skip a Task (oracle_merge_hist_orig) depend on a flag.
My logic is:
when oracle_branch=True execute [merge_op,update_table_op,table_count_op]
when oracle_branch=False execute [update_table_op, table_count_op]
I tried to use BranchPythonOperator as follows:
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
oracle_branch = True
def branch_func():
if oracle_branch:
return "oracle_branch"
else:
return "normal_branch"
dag = DAG(
dag_id='example_branch_operator',
default_args=args,
schedule_interval="@daily",
)
branching_op = BranchPythonOperator(
task_id='branch_shall_run_oracle_merge_original_hist',
python_callable=branch_func,
dag= dag)
oracle_branch = DummyOperator(
task_id='oracle_branch',
dag=dag)
normal_branch = DummyOperator(
task_id='normal_branch',
dag=dag)
merge_op = DummyOperator(
task_id='oracle_merge_hist_orig',
dag=dag,
)
update_table_op = DummyOperator(
task_id='update_table_job',
dag=dag,
)
table_count_op = DummyOperator(
task_id='table_count',
dag=dag,
)
branching_op >> [oracle_branch,normal_branch]
normal_branch >> update_table_op >> table_count_op
oracle_branch >> merge_op >> update_table_op >> table_count_op
However ,instead of skipping the Task it skips the entire path.
How to fix this so that I only skip the "racle_merge_hist_orig" task ?
Upvotes: 3
Views: 5101
Reputation: 939
Every task will have a trigger_rule
which is set to all_success
by default. We can override it to different values that are listed here.
In your DAG, the update_table_job
task has two upstream tasks. Since one of its upstream task is in skipped
state, it also went into skipped
state. We can avoid this by overriding the default value of trigger_rule
to one_success
like below.
update_table_op = DummyOperator(
task_id='update_table_job',
trigger_rule='one_success',
dag=dag
)
Note: I tested this on Airflow 1.10.4 version.
Upvotes: 11