Rachel Krueger
Rachel Krueger

Reputation: 83

Apache Airflow does not enforce dagrun_timeout

I am using Apache Airflow version 1.10.3 with the sequential executor, and I would like the DAG to fail after a certain amount of time if it has not finished. I tried setting dagrun_timeout in the example code

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'me',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 1),
    'retries': 0,
}

dag = DAG('min_timeout', default_args=default_args, schedule_interval=timedelta(minutes=5), dagrun_timeout = timedelta(seconds=30), max_active_runs=1)

t1 = BashOperator(
    task_id='fast_task',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='slow_task',
    bash_command='sleep 45',
    dag=dag)

t2.set_upstream(t1)

slow_task alone takes more than the time limit set by dagrun_timeout, so my understanding is that airflow should stop DAG execution. However, this does not happen, and slow_task is allowed to run for its entire duration. After this occurs, the run is marked as failed, but this does not kill the task or DAG as desired. Using execution_timeout for slow_task does cause the task to be killed at the specified time limit, but I would prefer to use an overall time limit for the DAG rather than specifying execution_timeout for each task.

Is there anything else I should try to achieve this behavior, or any mistakes I can fix?

Upvotes: 8

Views: 5267

Answers (2)

Ihor Konovalenko
Ihor Konovalenko

Reputation: 1407

According to Airflow documentation:

dagrun_timeout (datetime.timedelta) – specify how long a DagRun should be up before timing out / failing, so that new DagRuns can be created. The timeout is only enforced for scheduled DagRuns, and only once the # of active DagRuns == max_active_runs.

So dagrun_timeout wouldn't work for non-scheduled DagRuns (e.g. manually triggered) and if the number of active DagRuns < max_active_runs parameter.

Upvotes: 4

Oluwafemi Sule
Oluwafemi Sule

Reputation: 38932

The Airflow scheduler runs a loop at least every SCHEDULER_HEARTBEAT_SEC (the default is 5 seconds).

Bear in mind at least here, because the scheduler performs some actions that may delay the next cycle of its loop.

These actions include:

  • parsing the dags
  • filling up the DagBag
  • checking the DagRun and updating their state
  • scheduling next DagRun

In your example, the delayed task isn't terminated at the dagrun_timeout because the scheduler performs its next cycle after the task completes.

Upvotes: 4

Related Questions