Reputation: 1543
I am using airbnb's airflow and I create a simple task shown below. However the scheduler keeps running the task even though I set the interval to be hourly or any other interval. Another thing I notice is that if I set schedule interval to be '@once', it will never run the dag.
I followed the conventions here http://airflow.readthedocs.org/en/latest/scheduler.html#dag-runs
The simple dag im using.
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 1, 5),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'schedule_test', default_args=default_args, schedule_interval='@hourly')
# t1, t2 and t3 are examples of tasks created by instatiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
Scheduler output for reference. As you can see it keeps running it over and over but in my dag, I have schedule_interval='@hourly'
2016-01-06 20:34:37,266 - root - INFO - Starting the scheduler
2016-01-06 20:34:37,267 - root - INFO - Filling up the DagBag from /usr/local/airflow/dags
2016-01-06 20:34:37,267 - root - INFO - Importing /usr/local/airflow/dags/test.py
2016-01-06 20:34:37,272 - root - INFO - Loaded DAG <DAG: schedule_test>
2016-01-06 20:34:37,281 - root - INFO - Prioritizing 0 queued jobs
2016-01-06 20:34:37,288 - root - INFO - Importing /usr/local/airflow/dags/test.py
2016-01-06 20:34:37,291 - root - INFO - Loaded DAG <DAG: schedule_test>
2016-01-06 20:34:37,318 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 00:00:00: scheduled__2016-01-06T00:00:00, externally triggered: False>
2016-01-06 20:34:37,321 - root - INFO - Getting list of tasks to skip for active runs.
2016-01-06 20:34:37,323 - root - INFO - Checking dependencies on 1 tasks instances, minus 0 skippable ones
2016-01-06 20:34:37,326 - root - INFO - Adding to queue: airflow run schedule_test print_date 2016-01-06T00:00:00 --local -sd DAGS_FOLDER/test.py
2016-01-06 20:34:37,347 - root - INFO - Done queuing tasks, calling the executor's heartbeat
2016-01-06 20:34:37,347 - root - INFO - Loop took: 0.071298 seconds
2016-01-06 20:34:37,356 - root - INFO - Finding 'running' jobs without a recent heartbeat
2016-01-06 20:34:37,357 - root - INFO - Failing jobs without heartbeat after 2016-01-06 20:32:22.357089
2016-01-06 20:34:42,269 - root - INFO - Prioritizing 0 queued jobs
2016-01-06 20:34:42,274 - root - INFO - Importing /usr/local/airflow/dags/test.py
2016-01-06 20:34:42,277 - root - INFO - Loaded DAG <DAG: schedule_test>
2016-01-06 20:34:42,295 - root - INFO - Done queuing tasks, calling the executor's heartbeat
2016-01-06 20:34:42,296 - root - INFO - Loop took: 0.029931 seconds
2016-01-06 20:34:42,309 - root - INFO - Finding 'running' jobs without a recent heartbeat
2016-01-06 20:34:42,310 - root - INFO - Failing jobs without heartbeat after 2016-01-06 20:32:27.310303
2016-01-06 20:34:42,314 - root - INFO - commandairflow run schedule_test print_date 2016-01-06T00:00:00 --local -sd DAGS_FOLDER/test.py
Logging into: /usr/local/airflow/logs/schedule_test/print_date/2016-01-06T00:00:00
2016-01-06 20:34:47,895 - root - INFO - Prioritizing 0 queued jobs
2016-01-06 20:34:47,900 - root - INFO - Importing /usr/local/airflow/dags/test.py
2016-01-06 20:34:47,904 - root - INFO - Loaded DAG <DAG: schedule_test>
2016-01-06 20:34:47,922 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 00:00:00: scheduled__2016-01-06T00:00:00, externally triggered: False>
2016-01-06 20:34:47,925 - root - INFO - Marking run <DagRun schedule_test @ 2016-01-06 00:00:00: scheduled__2016-01-06T00:00:00, externally triggered: False> successful
2016-01-06 20:34:47,926 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 01:00:00: scheduled__2016-01-06T01:00:00, externally triggered: False>
2016-01-06 20:34:47,928 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 02:00:00: scheduled__2016-01-06T02:00:00, externally triggered: False>
2016-01-06 20:34:47,937 - root - INFO - Getting list of tasks to skip for active runs.
2016-01-06 20:34:47,940 - root - INFO - Checking dependencies on 2 tasks instances, minus 0 skippable ones
2016-01-06 20:34:47,943 - root - INFO - Adding to queue: airflow run schedule_test print_date 2016-01-06T01:00:00 --local -sd DAGS_FOLDER/test.py
2016-01-06 20:34:47,947 - root - INFO - Adding to queue: airflow run schedule_test print_date 2016-01-06T02:00:00 --local -sd DAGS_FOLDER/test.py
2016-01-06 20:34:47,960 - root - INFO - Done queuing tasks, calling the executor's heartbeat
2016-01-06 20:34:47,960 - root - INFO - Loop took: 0.067789 seconds
2016-01-06 20:34:47,968 - root - INFO - Finding 'running' jobs without a recent heartbeat
2016-01-06 20:34:47,968 - root - INFO - Failing jobs without heartbeat after 2016-01-06 20:32:32.968940
2016-01-06 20:34:52,901 - root - INFO - Prioritizing 0 queued jobs
2016-01-06 20:34:52,906 - root - INFO - Importing /usr/local/airflow/dags/test.py
2016-01-06 20:34:52,909 - root - INFO - Loaded DAG <DAG: schedule_test>
2016-01-06 20:34:52,942 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 01:00:00: scheduled__2016-01-06T01:00:00, externally triggered: False>
2016-01-06 20:34:52,946 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 02:00:00: scheduled__2016-01-06T02:00:00, externally triggered: False>
2016-01-06 20:34:52,948 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 03:00:00: scheduled__2016-01-06T03:00:00, externally triggered: False>
2016-01-06 20:34:52,950 - root - INFO - Getting list of tasks to skip for active runs.
2016-01-06 20:34:52,953 - root - INFO - Checking dependencies on 3 tasks instances, minus 0 skippable ones
2016-01-06 20:34:52,961 - root - INFO - Adding to queue: airflow run schedule_test print_date 2016-01-06T03:00:00 --local -sd DAGS_FOLDER/test.py
2016-01-06 20:34:52,975 - root - INFO - Done queuing tasks, calling the executor's heartbeat
2016-01-06 20:34:52,976 - root - INFO - Loop took: 0.07741 seconds
2016-01-06 20:34:52,982 - root - INFO - Finding 'running' jobs without a recent heartbeat
2016-01-06 20:34:52,983 - root - INFO - Failing jobs without heartbeat after 2016-01-06 20:32:37.983542
2016-01-06 20:34:52,987 - root - INFO - commandairflow run schedule_test print_date 2016-01-06T02:00:00 --local -sd DAGS_FOLDER/test.py
Logging into: /usr/local/airflow/logs/schedule_test/print_date/2016-01-06T02:00:00
2016-01-06 20:34:58,583 - root - INFO - commandairflow run schedule_test print_date 2016-01-06T01:00:00 --local -sd DAGS_FOLDER/test.py
Logging into: /usr/local/airflow/logs/schedule_test/print_date/2016-01-06T01:00:00
2016-01-06 20:35:04,215 - root - INFO - Prioritizing 0 queued jobs
2016-01-06 20:35:04,223 - root - INFO - Importing /usr/local/airflow/dags/test.py
2016-01-06 20:35:04,229 - root - INFO - Loaded DAG <DAG: schedule_test>
2016-01-06 20:35:04,263 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 01:00:00: scheduled__2016-01-06T01:00:00, externally triggered: False>
2016-01-06 20:35:04,267 - root - INFO - Marking run <DagRun schedule_test @ 2016-01-06 01:00:00: scheduled__2016-01-06T01:00:00, externally triggered: False> successful
2016-01-06 20:35:04,268 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 02:00:00: scheduled__2016-01-06T02:00:00, externally triggered: False>
2016-01-06 20:35:04,272 - root - INFO - Marking run <DagRun schedule_test @ 2016-01-06 02:00:00: scheduled__2016-01-06T02:00:00, externally triggered: False> successful
2016-01-06 20:35:04,273 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 03:00:00: scheduled__2016-01-06T03:00:00, externally triggered: False>
2016-01-06 20:35:04,276 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 04:00:00: scheduled__2016-01-06T04:00:00, externally triggered: False>
Upvotes: 3
Views: 5275
Reputation: 21
I have seen this issue , actually in airflow.cfg file you can see number of workers and worker refresh time , generally it is 30 seconds so every 30 s new worker is started and that loads all dags again and runs them.I made worker refresh to 0 to stop this issue.
Upvotes: 2
Reputation: 4024
Are you providing the start_date
correctly?
It could be provided in the program or while running the scheduler. If this date is from the past, airflow will simply figure out the instances from the past where your dag should have executed and queue it for the execution(back-fill).
Upvotes: 0