Reputation: 6393
Is it possible to create an Airflow DAG that runs every day except for Saturday and Sunday? It doesn't seem like this is possible since you only have a start_date and a schedule_interval.
I'm setting up a workflow that will process a batch of files every morning. The files won't be present on weekends though only Monday through Friday. I could simply use a timeout setting of 24 hours which essentially makes Saturday and Sunday timeout because the file would never appear on those days but this would mark the DAG as failed for those two days and that would be very pleasant.
Upvotes: 5
Views: 13117
Reputation: 13036
The answer from Zack already has a weekdays cron schedule that'll do what you want (0 0 * * 1-5
), but I wanted to add an answer with a site for examples of common cron schedules, ahem, crontab expressions.
I use this with Airflow a lot to come up with a DAG's schedule_interval
.
The main app to help you design a cron schedule interactively is at crontab.guru.
An example only on weekdays schedule - https://crontab.guru/every-weekday
More common examples (e.g., every half hour, every quarter, etc) - https://crontab.guru/examples.html
Upvotes: 5
Reputation: 974
I had a similar need and ended up putting this at the beginning of my dags -- it's similar to the ShortCircuitOperator.
import logging
from airflow.models import SkipMixin, BaseOperator
from airflow.utils.decorators import apply_defaults
class pull_file_decision_operator(BaseOperator, SkipMixin):
template_fields = ('execution_date',)
@apply_defaults
def __init__(self,
day_of_week,
hour_of_day,
execution_date,
**kwargs):
self.day_of_week = day_of_week
self.hour_of_week = hour_of_day
self.execution_date = execution_date
def execute(self, context):
# https://docs.python.org/3/library/datetime.html#datetime.date.weekday
run_dt = self.execution_date
dow = self.day_of_week
hod = self.hour_of_day
if run_dt.weekday() == dow and run_dt.hour == hod:
return True
else:
downstream_tasks = context['task'].get_flat_relatives(upstream=False)
logging.info('Skipping downstream tasks...')
logging.info("Downstream task_ids %s", downstream_tasks)
if downstream_tasks:
self.skip(context['dag_run'],
context['ti'].execution_date,
downstream_tasks)
Upvotes: 1
Reputation: 2466
'schedule_interval': '0 0 * * 1-5'
runs at 00:00 on every day-of-week from Monday through Friday.
Upvotes: 14