Xiang Zhang
Xiang Zhang

Reputation: 2973

how to align execution date in a non-catchup setting in airflow?

In a catchup setting, I observed that the execution date send to executor is correctly aligned, but when I turn off catchup setting like this:

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': airflow.utils.dates.days_ago(5),
    'schedule_interval': '@daily'
}

dag = DAG('xiang-01', catchup=False, default_args=default_args)

task = BashOperator(
    task_id='task',
    bash_command='echo "{{ task_instance_key_str }} {{ ts }}" && sleep 10',
    dag=dag)

The execution date is not aligned, for example, the rendered task is:

echo "xiang-01__task__20180909 2018-09-09T22:33:17.961926+00:00" && sleep 10

According to the document, it should be aligned: https://airflow.apache.org/scheduler.html#backfill-and-catchup

So what am I missing?

Update:

To be more precise, since my start date is set via days_ago(5), which is set to the midnight of 5 days ago, start from 00:00:00. What I expected is, the execution date shall also be aligned by midnight, something like 2018-09-09T00:00:00, but what I got is a time 2018-09-09T22:33:17.961926+00:00, which looks like aligned by the time when I un-paused this DAG.

Upvotes: 0

Views: 512

Answers (1)

Xiang Zhang
Xiang Zhang

Reputation: 2973

I figured it out, the following modification works:

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': airflow.utils.dates.days_ago(5),
    #'schedule_interval': '@daily'
}

dag = DAG('xiang-02', catchup=False, default_args=default_args, schedule_interval='@daily')

task = BashOperator(
    task_id='task',
    bash_command='echo "{{ task_instance_key_str }} {{ ts }}" && sleep 10',
    dag=dag)

The catch is, looks like schedule_interval is a DAG argument now, if I use it inside DAG construction, it works correctly now.

Upvotes: 1

Related Questions