Reputation: 1264
I want the tasks in the DAG to all finish before the 1st task of the next run gets executed.
I have max_active_runs = 1, but this still happens.
default_args = {
'depends_on_past': True,
'wait_for_downstream': True,
'max_active_runs': 1,
'start_date': datetime(2018, 03, 04),
'owner': 't.n',
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=4)
}
dag = DAG('example', default_args=default_args, schedule_interval = schedule_interval)
(All of my tasks are dependent on the previous task. Airflow version is 1.8.0)
Thank you
Upvotes: 46
Views: 60802
Reputation: 6279
You've put the 'max_active_runs': 1
into the default_args
parameter and not into the correct spot.
max_active_runs
is a constructor argument for a DAG and should not be put into the default_args
dictionary.
Here is an example DAG that shows where you need to move it to:
dag_args = {
'owner': 'Owner',
# 'max_active_runs': 1, # <--- Here is where you had it.
'depends_on_past': False, # |
'start_date': datetime(2018, 01, 1, 12, 00), # |
'email_on_failure': False # |
} # |
# |
sched = timedelta(hours=1) # |
dag = DAG( # |
job_id, # |
default_args=dag_args, # |
schedule_interval=sched, # V
max_active_runs=1 # <---- Here is where it is supposed to be
)
If the tasks that your dag is running are actually sub-dags then you may need to pass max_active_runs
into the subdags too but not 100% sure on this.
Upvotes: 72
Reputation: 5
Actually you should use DAG_CONCURRENCY=1 as environment var. Worked for me.
Upvotes: -3
Reputation: 1264
I changed to put max_active_runs
as an argument of DAG()
instead of in default_arguments, and it worked.
Thanks SimonD for giving me the idea, though not directly pointing to it in your answer.
Upvotes: 61
Reputation: 644
You can use xcoms to do it. First take 2 python operators as 'start' and 'end' to the DAG. Set the flow as:
start ---> ALL TASKS ----> end
'end' will always push a variable
last_success = context['execution_date'] to xcom (xcom_push). (Requires provide_context = True in the PythonOperators).
And 'start' will always check xcom (xcom_pull) to see whether there exists a last_success variable with value equal to the previous DagRun's execution_date or to the DAG's start_date (to let the process start).
Followed this answer
Upvotes: -5