Marton Trencseni
Marton Trencseni

Reputation: 887

How to trigger daily DAG run at midnight local time instead of midnight UTC time

I'm in the UTC+4 timezone, so when Airflow triggers the nightly ETLs, it's already 4:00AM here. How can I tell Airflow to trigger the run for day ds already on day ds-1 at 20:00, but with ds=ds?

Per the docs it's highly recommended to keep all servers on UTC, so that's why I'm looking for an application-level solution.

EDIT: a hacky solution is to define it to run everyday at 20:00PM, so the "previous" day, but then use tomorrow_ds instead of ds in the job. But that still looks weird on the Airflow UI, because that's going to show the UTC execution time.

Upvotes: 12

Views: 16543

Answers (3)

Ganesh
Ganesh

Reputation: 757

I had come up with same problem. I have daily, hourly, half-an hour jobs.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import pendulum

local_tz = pendulum.timezone("Asia/Calcutta")

args = {
    'owner': 'ganesh',
    'depends_on_past': False,
    'start_date': datetime(2020, 3, 25, tzinfo=local_tz),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    dag_id='test1',
    default_args=args,
    schedule_interval='30 00 * * *'
    )

first_date = BashOperator(
    task_id='first_date'
    ,
    bash_command='date'
    , dag=dag, env=None, output_encoding='utf-8')

second_date = BashOperator(
    task_id='second_date'
    ,
    bash_command='echo date'
    , dag=dag, env=None, output_encoding='utf-8')

first_date >> second_date



Upvotes: 3

Breathe
Breathe

Reputation: 724

You could write a python util that rewrite your tz-based schedule into UTC? https://github.com/bloomberg/tzcron/blob/master/tzcron.py

EDIT: there is recent commit that makes Airflow Timezone aware: https://github.com/apache/incubator-airflow/commit/f1ab56cc6ad3b9419af94aaa333661c105185883

Upvotes: 0

Ash Berlin-Taylor
Ash Berlin-Taylor

Reputation: 4048

Schedule interval can also be a "cron expression" which means you can easily run it at 20:00 UTC. That coupled with "user_defined_filters" means you can, with a bit of trickery get the behaviour you want:

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

import pytz
tz = pytz.timezone('Asia/Dubai')


def localize_utc_tz(d):
    return tz.fromutc(d)

default_args = {
    'start_date': datetime(2017, 11, 8),
}
dag = DAG(
    'plus_4_utc',
    default_args=default_args,
    schedule_interval='0 20 * * *',
    user_defined_filters={
        'localtz': localize_utc_tz,
    },
)
task = BashOperator(
        task_id='task_for_testing_file_log_handler',
        dag=dag,
        bash_command='echo UTC {{ ts }}, Local {{ execution_date | localtz }} next {{ next_execution_date | localtz }}',
)

This outputs:

UTC 2017-11-08T20:00:00, Local 2017-11-09 00:00:00+04:00 next 2017-11-10 00:00:00+04:00

You'll have to be careful about the "types" of variables you use. For instance ds and ts are strings, not datetime objects which means the filter wont work on them

Upvotes: 14

Related Questions