Kyle Bridenstine
Kyle Bridenstine

Reputation: 6393

Airflow DAG Runs Daily Excluding Weekends?

Is it possible to create an Airflow DAG that runs every day except for Saturday and Sunday? It doesn't seem like this is possible since you only have a start_date and a schedule_interval.

I'm setting up a workflow that will process a batch of files every morning. The files won't be present on weekends though only Monday through Friday. I could simply use a timeout setting of 24 hours which essentially makes Saturday and Sunday timeout because the file would never appear on those days but this would mark the DAG as failed for those two days and that would be very pleasant.

Upvotes: 5

Views: 13117

Answers (3)

Taylor D. Edmiston
Taylor D. Edmiston

Reputation: 13036

The answer from Zack already has a weekdays cron schedule that'll do what you want (0 0 * * 1-5), but I wanted to add an answer with a site for examples of common cron schedules, ahem, crontab expressions.

I use this with Airflow a lot to come up with a DAG's schedule_interval.

The main app to help you design a cron schedule interactively is at crontab.guru.

An example only on weekdays schedule - https://crontab.guru/every-weekday

More common examples (e.g., every half hour, every quarter, etc) - https://crontab.guru/examples.html

Upvotes: 5

Ben Gregory
Ben Gregory

Reputation: 974

I had a similar need and ended up putting this at the beginning of my dags -- it's similar to the ShortCircuitOperator.

import logging
from airflow.models import SkipMixin, BaseOperator
from airflow.utils.decorators import apply_defaults


class pull_file_decision_operator(BaseOperator, SkipMixin):

   template_fields = ('execution_date',)

   @apply_defaults
   def __init__(self,
                day_of_week, 
                hour_of_day,
                execution_date,
                **kwargs):

       self.day_of_week = day_of_week
       self.hour_of_week = hour_of_day
       self.execution_date = execution_date

   def execute(self, context):
       # https://docs.python.org/3/library/datetime.html#datetime.date.weekday

       run_dt = self.execution_date
       dow = self.day_of_week
       hod = self.hour_of_day

       if run_dt.weekday() == dow and run_dt.hour == hod:
          return True
       else:
          downstream_tasks = context['task'].get_flat_relatives(upstream=False)
          logging.info('Skipping downstream tasks...')
          logging.info("Downstream task_ids %s", downstream_tasks)

          if downstream_tasks:
             self.skip(context['dag_run'],
                       context['ti'].execution_date,
                       downstream_tasks)

Upvotes: 1

Zack
Zack

Reputation: 2466

'schedule_interval': '0 0 * * 1-5' runs at 00:00 on every day-of-week from Monday through Friday.

Upvotes: 14

Related Questions