Reputation: 8790
I am learning Airflow and I looked at one of the example DAGs that are shipped with Airflow (example_branch_python_dop_operator_3.py)
In this example, the DAG branches to one branch if the minute (of the execution datetime) is an even number, and another branch if the minute is an odd number. Additionally, the DAG has depends_on_past
set to True
as a default value for all the tasks. The full code is:
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
'depends_on_past': True,
}
# BranchPython operator that depends on past
# and where tasks may run or be skipped on
# alternating runs
dag = DAG(
dag_id='example_branch_dop_operator_v3',
schedule_interval='*/1 * * * *',
default_args=args,
)
def should_run(**kwargs):
print('------------- exec dttm = {} and minute = {}'.
format(kwargs['execution_date'], kwargs['execution_date'].minute))
if kwargs['execution_date'].minute % 2 == 0:
return "dummy_task_1"
else:
return "dummy_task_2"
cond = BranchPythonOperator(
task_id='condition',
provide_context=True,
python_callable=should_run,
dag=dag,
)
dummy_task_1 = DummyOperator(task_id='dummy_task_1', dag=dag)
dummy_task_2 = DummyOperator(task_id='dummy_task_2', dag=dag)
cond >> [dummy_task_1, dummy_task_2]
I would have expected, since depends_on_past
is True, that after the first DAG Run the tasks would no longer be able to start. Each task would look at the status of the previous task and see that it was skipped
, which is not success, and essentially hang without a status.
However, that is not what happened. Here are the results in Tree View:
As you can see, all of the selected tasks are running in every DAG Run. Why is this happening? Do I misunderstand what depends_on_past
means? I thought each task looked at the status of the task with the same task_id in the previous DAG Run.
To get this to run, I simply turned the DAG on in the main interface, so I believe these are scheduled runs.
Upvotes: 10
Views: 4075
Reputation: 3056
From changelog for version Airflow 1.7.1, 2016-05-19
- Treat SKIPPED and SUCCESS the same way when evaluating depends_on_past=True
Looks like condition is checked here:
airflow/ti_deps/deps/prev_dagrun_dep.py (master brunch)
line 75: if previous_ti.state not in {State.SKIPPED, State.SUCCESS}:
Upvotes: 7