Reputation: 4624
I have created tasks in airflow which I scheduled to run hourly and start_date
is set to 2016-11-16
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 11, 16),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('test_hourly_job', default_args=default_args,schedule_interval="@hourly")
I kicked off airflow at current time which is 10:00 AM
and I could see Airflow is running it from 00:00 AM
, then 01:00 AM
and so on:
INFO - Executing command: airflow run test_hourly_job task1 2016-11-16T00:00:00 --local -sd DAGS_FOLDER/test_airflow.py
........
........
INFO - Executing command: airflow run test_hourly_job task1 2016-11-16T01:00:00 --local -sd DAGS_FOLDER/test_airflow.py
.......
.......
How to configure airflow to start say from current time and run hourly going forward, instead of starting from 00:00
?
Upvotes: 1
Views: 9497
Reputation: 849
Airflow provides gem of an operator called LatestOnlyOperator to skip tasks that are not being run during the most recent scheduled run for a DAG. The LatestOnlyOperator skips all immediate downstream tasks, and itself, if the time right now is not between its execution_time and the next scheduled execution_time. This operator reduces wastes of CPU cycles.
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 11, 16),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('test_hourly_job', default_args=default_args,schedule_interval="@hourly")
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
task1 = DummyOperator(task_id='task1', dag=dag)
latest_only >> task
Latest_only should always be in the upstream of the task that you want to skip. The advantage of latest_only operator is that whenever you restart the dag it will skip the tasks for all the previous times and run the current dag.
Also its better not to hard code the start time. Instead put:
from datetime import datetime, timedelta
START_DATE = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
Upvotes: 0
Reputation: 698
load_examples = False
in ~/airflow/airflow.cfg
airflow webserver -p <port>
~/airflow/dags
$ airflow scheduler
Now for schedule interval see the below code.
Try this:
'start_date': datetime.now()
dag = DAG('tutorial', default_args=default_args, schedule_interval="* * * * *")
or
'start_date': datetime(2015, 6, 1),
dag = DAG('tutorial', default_args=default_args, schedule_interval="@hourly")
Full code
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
#'start_date': datetime(2015, 6, 1),
'start_date': datetime.now(),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
#'retries': 1,
#'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('tutorial', default_args=default_args, schedule_interval="* * * * *") // For minute
#dag = DAG('tutorial', default_args=default_args, schedule_interval="@hourly")
#
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
Upvotes: 0
Reputation: 208
In your Question you written Dictionary : default_args
In this there is Key: 'start_date': datetime(2016, 11, 16)
Actually here is datetime object is created that having input YYYY/MM/DD format, we are not providing Time input so it takes as default 00:00, so your script runs at time 00:00 you can check this way: in python
from datetime import datetime
datetime(2016, 11, 16)
#That Datetime object is generated with 00:00 Time
#datetime(2016, 11, 16, 0, 0)
#If you need Current date and time to start process you can set value as:
'start_date': datetime.now()
#if you want only current time with respective date then you can use as fallows:
current_date = datetime.now()
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 11, 16, current_date.hour, current_date.minute),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('test_hourly_job', default_args=default_args,schedule_interval="@hourly")
Upvotes: 3