comet
comet

Reputation: 65

Google Cloud Composer DAG is not getting triggered

I'm scheduling a DAG to run at 04:00 AM, Tuesday through Saturday eastern standard time (NY) starting from today 2020/08/11. After writing up the code and deploying, I expected the DAG to get triggered. I refreshed my Airflow UI page a couple of times but it's not triggering still. I am using Airflow version v1.10.9-composer with python 3.

This is my DAG code:

"""
This DAG executes a retrieval job
"""

# Required packages to execute DAG

from __future__ import print_function
import pendulum
from airflow.models import DAG
from airflow.models import Variable
from datetime import datetime, timedelta
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule


local_tz = pendulum.timezone("America/New_York")

# DAG parameters

default_args = {
    'owner': 'Me',
    'depends_on_past': False,
    'start_date': datetime(2020, 8, 10, 4, tzinfo=local_tz),
    'dagrun_timeout': None,
    'email': Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'provide_context': True,
    'retries': None,
    'retry_delay': timedelta(minutes=5)
}

# create DAG object with Name and default_args
with DAG(
        'retrieve_files',
        schedule_interval='0 4 * * 2-6',
        description='Retrieves files from sftp',
        max_active_runs=1,
        catchup=True,
        default_args=default_args
) as dag:
    # Define tasks - below are dummy tasks and a task instantiated by SSHOperator- calling methods written in other py class
    start_dummy = DummyOperator(
        task_id='start',
        dag=dag
    )

    end_dummy = DummyOperator(
        task_id='end',
        trigger_rule=TriggerRule.NONE_FAILED,
        dag=dag
    )

    retrieve_file = SSHOperator(
        ssh_conn_id="my_conn",
        task_id='retrieve_file',
        command='/usr/bin/python3  /path_to_file/getFile.py',
        dag=dag)


    dag.doc_md = __doc__

    retrieve_file.doc_md = """\
    #### Task Documentation
    Connects to sftp and retrieves files.
    """

    start_dummy >> retrieve_file >> end_dummy

Upvotes: 0

Views: 1525

Answers (1)

aga
aga

Reputation: 3903

Referring to the official documentation:

The scheduler runs your job one schedule_interval AFTER the start date.

If your start_date is 2020-01-01 and schedule_interval is @daily, the first run will be created on 2020-01-02 i.e., after your start date has passed.

In order to run a DAG at a specific time everyday (including today), the start_date needs to be set to a time in the past and schedule_interval needs to have the desired time in cron format. It is very important to set yesterday's datetime properly or the trigger won't work.

In that case, we should set the start_date as a Tuesday from previous week, which is: (2020, 8, 4). There should be 1 week interval that has passed since your start date, because of running it weekly.

Let's take a look for the following example, which shows how to run a job 04:00 AM, Tuesday through Saturday EST:

from datetime import datetime, timedelta
from airflow import models
import pendulum
from airflow.operators import bash_operator

local_tz = pendulum.timezone("America/New_York")

default_dag_args = {
    'start_date': datetime(2020, 8, 4, 4, tzinfo=local_tz),
    'retries': 0,
}

with models.DAG(
        'Test',
        default_args=default_dag_args,
        schedule_interval='00 04 * * 2-6') as dag:
       # DAG code
    print_dag_run_conf = bash_operator.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

I recommend you to check the what’s the deal with start_date documentation.

Upvotes: 1

Related Questions