DougKruger
DougKruger

Reputation: 4624

set airflow schedule interval

I have created tasks in airflow which I scheduled to run hourly and start_date is set to 2016-11-16

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 11, 16),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('test_hourly_job', default_args=default_args,schedule_interval="@hourly")

I kicked off airflow at current time which is 10:00 AM and I could see Airflow is running it from 00:00 AM, then 01:00 AM and so on:

INFO - Executing command: airflow run test_hourly_job task1 2016-11-16T00:00:00 --local -sd DAGS_FOLDER/test_airflow.py
........
........
INFO - Executing command: airflow run test_hourly_job task1 2016-11-16T01:00:00 --local -sd DAGS_FOLDER/test_airflow.py 
.......
....... 

How to configure airflow to start say from current time and run hourly going forward, instead of starting from 00:00 ?

Upvotes: 1

Views: 9497

Answers (3)

Neil
Neil

Reputation: 849

Airflow provides gem of an operator called LatestOnlyOperator to skip tasks that are not being run during the most recent scheduled run for a DAG. The LatestOnlyOperator skips all immediate downstream tasks, and itself, if the time right now is not between its execution_time and the next scheduled execution_time. This operator reduces wastes of CPU cycles.

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 11, 16),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}

dag = DAG('test_hourly_job', default_args=default_args,schedule_interval="@hourly")

latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)

task1 = DummyOperator(task_id='task1', dag=dag)

latest_only >> task  

Latest_only should always be in the upstream of the task that you want to skip. The advantage of latest_only operator is that whenever you restart the dag it will skip the tasks for all the previous times and run the current dag.

Also its better not to hard code the start time. Instead put:

from datetime import datetime, timedelta

START_DATE = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())

Upvotes: 0

Siddharth Kumar
Siddharth Kumar

Reputation: 698

  1. Install airflow in python virtual environment.
  2. Activate the environment.
  3. Reset load_examples = False in ~/airflow/airflow.cfg
  4. Start the airflow. $ airflow webserver -p <port>
  5. Copy the below dag in ~/airflow/dags
  6. Start the scheduler $ airflow scheduler

Now for schedule interval see the below code.

Try this:

 'start_date': datetime.now()
 dag = DAG('tutorial', default_args=default_args, schedule_interval="* * * * *")

or

'start_date': datetime(2015, 6, 1),
dag = DAG('tutorial', default_args=default_args, schedule_interval="@hourly")

Full code

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    #'start_date': datetime(2015, 6, 1),
    'start_date': datetime.now(),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    #'retries': 1,
    #'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args, schedule_interval="* * * * *") // For minute
#dag = DAG('tutorial', default_args=default_args, schedule_interval="@hourly")
#

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

Upvotes: 0

Prafull kadam
Prafull kadam

Reputation: 208

In your Question you written Dictionary : default_args

In this there is Key: 'start_date': datetime(2016, 11, 16)

Actually here is datetime object is created that having input YYYY/MM/DD format, we are not providing Time input so it takes as default 00:00, so your script runs at time 00:00 you can check this way: in python

from datetime import datetime

datetime(2016, 11, 16)

 #That Datetime object is generated with 00:00 Time  

#datetime(2016, 11, 16, 0, 0)

#If you need Current date and time to start process you can set value as:
'start_date': datetime.now()
#if you want only current time with respective date then you can use as fallows:

current_date = datetime.now() 
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 11, 16, current_date.hour, current_date.minute),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
dag = DAG('test_hourly_job', default_args=default_args,schedule_interval="@hourly")

Upvotes: 3

Related Questions