k16
k16

Reputation: 481

How to work correctly airflow schedule_interval

I want to try to use Airflow instead of Cron. But schedule_interval doesn't work as I expected.

I wrote the python code like below.
And in my understanding, Airflow should have ran on "2016/03/30 8:15:00" but it didn't work at that time.

If I changed it like this "'schedule_interval': timedelta(minutes = 5)", it worked correctly, I think.

The "notice_slack.sh" is just to call slack api to my channels.

# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 3, 29, 8, 15),
}

dag = DAG(
    dag_id='notice_slack',
    default_args=args,
    schedule_interval="@daily",
    dagrun_timeout=timedelta(minutes=1))

# cmd file name
CMD = '/tmp/notice_slack.sh'

run_this = BashOperator(
    task_id='run_transport', bash_command=CMD, dag=dag)

I want to run some of my scripts at specific time every day like this cron setting.

15 08 * * * bash /tmp/notice_slack.sh

I have read the document Scheduling & Triggers, and I know it's a little bit different cron.
So I attempt to arrange at "start_date" and "schedule_interval" settings.

Does anyone know what should I do ?

airflow version

INFO - Using executor LocalExecutor

v1.7.0

amazon-linux-ami/2015.09-release-notes

Upvotes: 20

Views: 148708

Answers (4)

SunnyAk
SunnyAk

Reputation: 588

First, your start date should be in the past - Instead of 'start_date': datetime(2016, 3, 29, 8, 15) Would you try 'start_date': datetime(2016, 2, 29, 8, 15)

and apply 'catchup':False to prevent backfills - unless this was something you wanted to do.

From Airflow documentation - The Airflow scheduler triggers the task soon after the start_date + schedule_interval is passed.

The schedule interval can be supplied as a cron - If you want to run it everyday at 8:15 AM, the expression would be - *'15 8 * * '

If you want to run it only on Oct 31st at 8:15 AM, the expression would be - *'15 8 31 10 '

To supply this, 'schedule_inteval':'15 8 * * *' in your Dag property

You can figure this out more from https://crontab.guru/

Alternatively, there are Airflow presets - enter image description here

If any of these meet your requirements, it would be simply, 'schedule_interval':'@hourly'

Lastly, you can also apply the schedule as python timedelta object e.g. for 12 PM

'schedule_interval': timedelta(hours=12)

Upvotes: 16

dlamblin
dlamblin

Reputation: 45321

With the example you've given @daily will run your job after it passes midnight. You might try changing it either to timedelta(days=1) which is relative to your fixed start_date that includes 08:15. Or you could use a cron spec for the schedule_interval='15 08 * * *' in which case any start date prior to 8:15 on the day BEFORE the day you wanted the first run would work.

Note that depends_on_past: False is already the default, and you may have confused its behavior with catchup=false in the DAG parameters, which would avoid making past runs for time between the start date and now where the DAG schedule interval would have run.

Upvotes: 1

p.magalhaes
p.magalhaes

Reputation: 8354

Airflow will start your DAG when the 2016/03/30 8:15:00 + schedule interval (daily) is passed. So your DAG will run on 2016/03/31 8:15:00.

You can check the Airflow FAQ

Upvotes: 18

ansvver
ansvver

Reputation: 317

Try this:

# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 3, 29),
}

dag = DAG(
    dag_id='notice_slack',
    default_args=args,
    schedule_interval="15 08 * * *",
    dagrun_timeout=timedelta(minutes=1))

# cmd file name
CMD = 'bash /tmp/notice_slack.sh'

run_this = BashOperator(
    task_id='run_transport', bash_command=CMD, dag=dag)

start_date (datetime) – The start_date for the task, determines the execution_date for the first task instance. The best practice is to have the start_date rounded to your DAG’s schedule_interval.

schedule_interval (datetime.timedelta or dateutil.relativedelta.relativedelta or str that acts as a cron expression) – Defines how often that DAG runs, this timedelta object gets added to your latest task instance’s execution_date to figure out the next schedule.

Simply configuring the schedule_interval and bash_command as the same in your cron setting is okay.

Upvotes: 20

Related Questions