Tin Ng
Tin Ng

Reputation: 1264

How to limit Airflow to run only one instance of a DAG run at a time?

I want the tasks in the DAG to all finish before the 1st task of the next run gets executed.

I have max_active_runs = 1, but this still happens.

default_args = {
    'depends_on_past': True,
    'wait_for_downstream': True,
    'max_active_runs': 1,
    'start_date': datetime(2018, 03, 04),
    'owner': 't.n',
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=4)
}

dag = DAG('example', default_args=default_args, schedule_interval = schedule_interval)

(All of my tasks are dependent on the previous task. Airflow version is 1.8.0)

Thank you

Upvotes: 46

Views: 60802

Answers (4)

Simon D
Simon D

Reputation: 6279

You've put the 'max_active_runs': 1 into the default_args parameter and not into the correct spot.

max_active_runs is a constructor argument for a DAG and should not be put into the default_args dictionary.

Here is an example DAG that shows where you need to move it to:

dag_args = { 
    'owner': 'Owner',
    # 'max_active_runs': 1, # <--- Here is where you had it.
    'depends_on_past': False,                          # |
    'start_date': datetime(2018, 01, 1, 12, 00),       # |
    'email_on_failure': False                          # |
}                                                      # |
                                                       # |
sched = timedelta(hours=1)                             # |
dag = DAG(                                             # |
          job_id,                                      # |
          default_args=dag_args,                       # |
          schedule_interval=sched,                     # V
          max_active_runs=1 # <---- Here is where it is supposed to be
      ) 

If the tasks that your dag is running are actually sub-dags then you may need to pass max_active_runs into the subdags too but not 100% sure on this.

Upvotes: 72

karl j
karl j

Reputation: 5

Actually you should use DAG_CONCURRENCY=1 as environment var. Worked for me.

Upvotes: -3

Tin Ng
Tin Ng

Reputation: 1264

I changed to put max_active_runs as an argument of DAG() instead of in default_arguments, and it worked.

Thanks SimonD for giving me the idea, though not directly pointing to it in your answer.

Upvotes: 61

MB11
MB11

Reputation: 644

You can use xcoms to do it. First take 2 python operators as 'start' and 'end' to the DAG. Set the flow as:

start ---> ALL TASKS ----> end

'end' will always push a variable

last_success = context['execution_date'] to xcom (xcom_push). (Requires provide_context = True in the PythonOperators).

And 'start' will always check xcom (xcom_pull) to see whether there exists a last_success variable with value equal to the previous DagRun's execution_date or to the DAG's start_date (to let the process start).

Followed this answer

Upvotes: -5

Related Questions