elvikingo
elvikingo

Reputation: 987

How can I set different schedules for the same DAG based on different days in Airflow

I've got two DAGs that are defined like something like this

my_dag= DAG('my_dag_thu_and_friday',
           catchup=False,
           default_args=default_args,
           schedule_interval='0 12,13,15,19 * * THU,FRI'
         ) 
my_dag= DAG('my_dag_sat_and_sun',
           catchup=False,
           default_args=default_args,
           schedule_interval='0 13,17 * * SAT,SUN'
         ) 

They run the same operator and the same code but just on different schedules based on whether it is Thu/Fri or Sat/Sun. Is there a away I can specify the cron interval in such a way that I only have one DAG that handles scheduling conditionally?

Thanks

Upvotes: 0

Views: 3774

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 16109

There is no clean & simple solution at the moment however AIP-39 Richer scheduler_interval is going to tackle this so this should become easy in future Airflow versions.

For the moment your options are:

  1. Try to create 1 DAG using cron '0 12,13,15,17,19 * * THU,FRI,SAT,SUN' placing branching operators to determine if you should execute DummyOperator or your operator. So for example the run of 17 in THU Airflow will execute a DummyOperator (thus do nothing).
  2. Stay with 2 DAGs as you already do.
  3. Stay with 2 DAGs but use function that returns a DAG object thus avoiding the need to maintain duplicated code. You don't have to maintain duplicate code. You can write a function that returns a DAG object:

.

def create_dag(dag_id, schedule, default_args):
    dag = DAG(
        dag_id,
        schedule_interval=schedule,
        default_args=default_args)
    with dag:
        task = BashOperator(task_id='my_task')
    return dag

list_of_dags = [
    ('my_dag_thu_and_friday', '0 12,13,15,19 * * THU,FRI'),
    ('my_dag_sat_and_sun', '0 13,17 * * SAT,SUN')
]

default_args = {'owner': 'airflow', ...}

for dag_item in list_of_dags:
    dag_id = dag_item[0]
    dag_schedule = dag_item[1]
    globals()[dag_id] = create_dag(
        dag_id,
        dag_schedule,
        default_args
    )

Upvotes: 2

Related Questions