igr
igr

Reputation: 10604

Airflow timetable that combines multiple cron expressions?

I have several cron expressions that I need to apply to a single DAG. There is no way to express them with one single cron expression.

Airflow 2.2 introduced Timetable. Is there an implementation that takes a list of cron expressions?

Upvotes: 5

Views: 5240

Answers (2)

hi_i_m_GTooth
hi_i_m_GTooth

Reputation: 1

If you met Failed to serialize DAG 'multi_cron_timetable': Timetable class 'example.utils.timetable.MultiCronTimetable' is not registered, you need to put your timetable.py into $AIRFLOW_HOME/plugins.

Important Note


Make sure you setting up __init__.py properly like this tutorial. from operators.airflow_operator_template import MyOwnOperator is incorrect in the tutorial, you could ignore airflow_operator_template since it seems not in the Directory Structure.

Upvotes: 0

brki
brki

Reputation: 2780

I was looking for the same thing, but didn't find anything. It would be nice if a standard one came with Airflow.

Here's a 0.1 version that I wrote for Airflow 2.2.5.

# This file is <airflow plugins directory>/timetable.py

from typing import Any, Dict, List, Optional
import pendulum
from croniter import croniter
from pendulum import DateTime, Duration, timezone, instance as pendulum_instance
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.exceptions import AirflowTimetableInvalid


class MultiCronTimetable(Timetable):
    valid_units = ['minutes', 'hours', 'days']

    def __init__(self,
                 cron_defs: List[str],
                 timezone: str = 'Europe/Berlin',
                 period_length: int = 0,
                 period_unit: str = 'hours'):

        self.cron_defs = cron_defs
        self.timezone = timezone
        self.period_length = period_length
        self.period_unit = period_unit

    def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
        """
        Determines date interval for manually triggered runs.
        This is simply (now - period) to now.
        """
        end = run_after
        if self.period_length == 0:
            start = end
        else:
            start = self.data_period_start(end)
        return DataInterval(start=start, end=end)

    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: Optional[DataInterval],
        restriction: TimeRestriction) -> Optional[DagRunInfo]:
        """
        Determines when the DAG should be scheduled.

        """

        if restriction.earliest is None:
            # No start_date. Don't schedule.
            return None

        is_first_run = last_automated_data_interval is None

        if is_first_run:
            if restriction.catchup:
                scheduled_time = self.next_scheduled_run_time(restriction.earliest)

            else:
                scheduled_time = self.previous_scheduled_run_time()
                if scheduled_time is None:
                    # No previous cron time matched. Find one in the future.
                    scheduled_time = self.next_scheduled_run_time()
        else:
            last_scheduled_time = last_automated_data_interval.end

            if restriction.catchup:
                scheduled_time = self.next_scheduled_run_time(last_scheduled_time)

            else:
                scheduled_time = self.previous_scheduled_run_time()

                if scheduled_time is None or scheduled_time == last_scheduled_time:
                    # No previous cron time matched,
                    # or the matched cron time was the last execution time,
                    scheduled_time = self.next_scheduled_run_time()

                elif scheduled_time > last_scheduled_time:
                    # Matched cron time was after last execution time, but before now.
                    # Use this cron time
                    pass

                else:
                    # The last execution time is after the most recent matching cron time.
                    # Next scheduled run will be in the future
                    scheduled_time = self.next_scheduled_run_time()

        if scheduled_time is None:
            return None

        if restriction.latest is not None and scheduled_time > restriction.latest:
            # Over the DAG's scheduled end; don't schedule.
            return None

        start = self.data_period_start(scheduled_time)
        return DagRunInfo(run_after=scheduled_time, data_interval=DataInterval(start=start, end=scheduled_time))

    def data_period_start(self, period_end: DateTime):
        return period_end - Duration(**{self.period_unit: self.period_length})

    def croniter_values(self, base_datetime=None):
        if not base_datetime:
            tz = timezone(self.timezone)
            base_datetime = pendulum.now(tz)

        return [croniter(expr, base_datetime) for expr in self.cron_defs]

    def next_scheduled_run_time(self, base_datetime: DateTime = None):
        min_date = None
        tz = timezone(self.timezone)
        if base_datetime:
            base_datetime_localized = base_datetime.in_timezone(tz)
        else:
            base_datetime_localized = pendulum.now(tz)

        for cron in self.croniter_values(base_datetime_localized):
            next_date = cron.get_next(DateTime)
            if not min_date:
                min_date = next_date
            else:
                min_date = min(min_date, next_date)
        if min_date is None:
            return None
        return pendulum_instance(min_date)

    def previous_scheduled_run_time(self, base_datetime: DateTime = None):
        """
        Get the most recent time in the past that matches one of the cron schedules
        """
        max_date = None
        tz = timezone(self.timezone)
        if base_datetime:
            base_datetime_localized = base_datetime.in_timezone(tz)
        else:
            base_datetime_localized = pendulum.now(tz)

        for cron in self.croniter_values(base_datetime_localized):
            prev_date = cron.get_prev(DateTime)
            if not max_date:
                max_date = prev_date
            else:
                max_date = max(max_date, prev_date)
        if max_date is None:
            return None
        return pendulum_instance(max_date)


    def validate(self) -> None:
        if not self.cron_defs:
            raise AirflowTimetableInvalid("At least one cron definition must be present")

        if self.period_unit not in self.valid_units:
            raise AirflowTimetableInvalid(f'period_unit must be one of {self.valid_units}')

        if self.period_length < 0:
            raise AirflowTimetableInvalid(f'period_length must not be less than zero')

        try:
            self.croniter_values()
        except Exception as e:
            raise AirflowTimetableInvalid(str(e))

    @property
    def summary(self) -> str:
        """A short summary for the timetable.

        This is used to display the timetable in the web UI. A cron expression
        timetable, for example, can use this to display the expression.
        """
        return ' || '.join(self.cron_defs) + f' [TZ: {self.timezone}]'

    def serialize(self) -> Dict[str, Any]:
        """Serialize the timetable for JSON encoding.

        This is called during DAG serialization to store timetable information
        in the database. This should return a JSON-serializable dict that will
        be fed into ``deserialize`` when the DAG is deserialized.
        """
        return dict(cron_defs=self.cron_defs,
                    timezone=self.timezone,
                    period_length=self.period_length,
                    period_unit=self.period_unit)

    @classmethod
    def deserialize(cls, data: Dict[str, Any]) -> "MultiCronTimetable":
        """Deserialize a timetable from data.

        This is called when a serialized DAG is deserialized. ``data`` will be
        whatever was returned by ``serialize`` during DAG serialization.
        """
        return cls(**data)


class CustomTimetablePlugin(AirflowPlugin):
    name = "custom_timetable_plugin"
    timetables = [MultiCronTimetable]

To use it, you provide a list of cron expressions, optionally a timezone string, optionally a period length and period unit.

For my use case I don't actually need the period length + unit, which are used to determine the DAG's data_interval. You can just leave them at the default value of 0 minutes, if your DAG doesn't care about the data_interval.

I tried to imitate standard schedule_interval behaviour. For example if catchup = False and the DAG could have potentially been triggered several times since the last run (for whatever reason, for example the DAG ran longer than expected, or the scheduler wasn't running, or it's the DAG's very first time being scheduled), then the DAG will be scheduled to run for the latest previous matching time.

I haven't really tested it with catchup = True, but in theory it would run for every matching cron time since the DAG's start_date (but only once per distinct time, for example with */30 * * * * and 0 * * * * the DAG would run twice per hour, not three times).

Example DAG file:

from time import sleep
import airflow
from airflow.operators.python import PythonOperator
import pendulum
from timetable import MultiCronTimetable

def sleepy_op():
    sleep(660)


with airflow.DAG(
        dag_id='timetable_test',
        start_date=pendulum.datetime(2022, 6, 2, tz=pendulum.timezone('America/New_York')),
        timetable=MultiCronTimetable(['*/5 * * * *', '*/3 * * * fri,sat', '1 12 3 * *'], timezone='America/New_York', period_length=10, period_unit='minutes'),
        catchup=False,
        max_active_runs=1) as dag:

    sleepy = PythonOperator(
        task_id='sleepy',
        python_callable=sleepy_op
    )

Upvotes: 7

Related Questions