Kyle Bridenstine
Kyle Bridenstine

Reputation: 6383

Airflow DAG Running Every Second Rather Than Every Minute

I'm trying to schedule my DAG to run every minute but it seems to be running every second instead. Based on everything I've read I should just need to include schedule_interval='*/1 * * * *', #..every 1 minute in my DAG and that's it but it's not working. Here a simple example I setup to test it out:

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 6, 4),
    'schedule_interval': '*/1 * * * *', #..every 1 minute
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=1)
}

dag = DAG(
    dag_id='airflow_slack_example',
    start_date=datetime(2018, 6, 4),
    max_active_runs=3,
    schedule_interval='*/1 * * * *', #..every 1 minute
    default_args=default_args,
)

test= BashOperator(
    task_id='test',
    bash_command="echo hey >> /home/ec2-user/schedule_test.txt",
    retries=1,
    dag=dag)

Update:

After talking with @Taylor Edmiston in regards to his solution we realized that the reason I needed to add catchup=False is because I installed Airflow using Pip which uses an outdated version of Airflow. Apparently if you're using Airflow from the master branch of it's repository then you won't need to include catchup=False in order for it to run every minute like I was trying. So although the accepted answer fixed my issue it's sort of not addressing the underlying problem that was discovered by @Taylor Edmiston.

Upvotes: 8

Views: 10981

Answers (2)

Taylor D. Edmiston
Taylor D. Edmiston

Reputation: 13016

Your schedule_interval on the DAG is correct: */1 * * * * is every minute.

You can also remove start_date and schedule_interval from default_args since they're redundant with the kwargs provided to the DAG.

If you changed the schedule from when you first created this DAG, it's possible Airflow's gotten confused. Try deleting the DAG in the database, and then restarting the scheduler and webserver. If you are on the master branch of Airflow, it's as simple as $ airflow delete_dag my_dag; otherwise, the linked answer explains how to do it on other versions.

I boiled your code down to this to check and it is definitely running one DAG run per minute when run inside the master branch of Airflow.

from datetime import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
}

dag = DAG(
    dag_id='airflow_slack_example',
    start_date=datetime(2018, 6, 4),
    schedule_interval='*/1 * * * *',
    default_args=default_args,
)

test = BashOperator(
    task_id='test',
    bash_command='echo "one minute test"',
    dag=dag,
)

DAG runs:

enter image description here

Upvotes: 8

darthsidious
darthsidious

Reputation: 3081

Try adding catchup=False in the DAG(). It might be that your DAG is trying to backfill because of the start_date that you have declared.

Upvotes: 12

Related Questions