Reputation: 65
I'm scheduling a DAG to run at 04:00 AM, Tuesday through Saturday eastern standard time (NY) starting from today 2020/08/11. After writing up the code and deploying, I expected the DAG to get triggered. I refreshed my Airflow UI page a couple of times but it's not triggering still. I am using Airflow version v1.10.9-composer with python 3.
This is my DAG code:
"""
This DAG executes a retrieval job
"""
# Required packages to execute DAG
from __future__ import print_function
import pendulum
from airflow.models import DAG
from airflow.models import Variable
from datetime import datetime, timedelta
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
local_tz = pendulum.timezone("America/New_York")
# DAG parameters
default_args = {
'owner': 'Me',
'depends_on_past': False,
'start_date': datetime(2020, 8, 10, 4, tzinfo=local_tz),
'dagrun_timeout': None,
'email': Variable.get('email'),
'email_on_failure': True,
'email_on_retry': False,
'provide_context': True,
'retries': None,
'retry_delay': timedelta(minutes=5)
}
# create DAG object with Name and default_args
with DAG(
'retrieve_files',
schedule_interval='0 4 * * 2-6',
description='Retrieves files from sftp',
max_active_runs=1,
catchup=True,
default_args=default_args
) as dag:
# Define tasks - below are dummy tasks and a task instantiated by SSHOperator- calling methods written in other py class
start_dummy = DummyOperator(
task_id='start',
dag=dag
)
end_dummy = DummyOperator(
task_id='end',
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag
)
retrieve_file = SSHOperator(
ssh_conn_id="my_conn",
task_id='retrieve_file',
command='/usr/bin/python3 /path_to_file/getFile.py',
dag=dag)
dag.doc_md = __doc__
retrieve_file.doc_md = """\
#### Task Documentation
Connects to sftp and retrieves files.
"""
start_dummy >> retrieve_file >> end_dummy
Upvotes: 0
Views: 1525
Reputation: 3903
Referring to the official documentation:
The scheduler runs your job one schedule_interval AFTER the start date.
If your start_date is 2020-01-01 and schedule_interval is @daily, the first run will be created on 2020-01-02 i.e., after your start date has passed.
In order to run a DAG at a specific time everyday (including today), the start_date
needs to be set to a time in the past and schedule_interval
needs to have the desired time in cron
format. It is very important to set yesterday's datetime properly or the trigger won't work.
In that case, we should set the start_date
as a Tuesday from previous week, which is: (2020, 8, 4)
. There should be 1 week interval that has passed since your start date, because of running it weekly.
Let's take a look for the following example, which shows how to run a job 04:00 AM, Tuesday through Saturday EST:
from datetime import datetime, timedelta
from airflow import models
import pendulum
from airflow.operators import bash_operator
local_tz = pendulum.timezone("America/New_York")
default_dag_args = {
'start_date': datetime(2020, 8, 4, 4, tzinfo=local_tz),
'retries': 0,
}
with models.DAG(
'Test',
default_args=default_dag_args,
schedule_interval='00 04 * * 2-6') as dag:
# DAG code
print_dag_run_conf = bash_operator.BashOperator(
task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')
I recommend you to check the what’s the deal with start_date documentation.
Upvotes: 1