Reputation: 43
is there a way to set up/write a custom schedule_interval in an Airflow DAG? What I'm looking for is a way to set up a schedule when DAG runs on a daily basis except of holidays (like Christmas, Labor Day, Independence Day etc.)
It is not possible to achieve with standard cron expressions. Any help/guide is much appreciated.
Upvotes: 1
Views: 1234
Reputation: 15971
There is no native support for this type of scheduling but you can solve this with adding ShortCircuitOperator
to the beginning of your workflow.
This operator execute a python callable. If condition met it continue workflow if condition doesn't met it mark all downstream tasks as skipped.
Possible solution can be:
import holidays
def decide(**kwargs):
# Select country
us_holidays = holidays.US()
if str(kwargs['execution_date']) in us_holidays:
return False # Skip workflow if it's a holiday.
return True
dag = DAG(
dag_id='mydag',
schedule_interval='@daily',
default_args=default_args,
)
start_op = ShortCircuitOperator(
task_id='start_task',
python_callable=decide,
provide_context=True, # Remove this line if you are using Airflow>=2.0.0
dag=dag
)
#Replace this with your actual Operator in your workflow.
next_op = Operator(
task_id='next_task',
dag=dag
)
start_op >> next_op
This solution is based on the answer provided in Detecting a US Holiday I didn't test it but it should work. In any case you can replace the logic in decide
to any method that detects if a date is a holiday or not.
Upvotes: 0
Reputation: 1438
Use the PythonBranchOperator
or create a new operator that inherits BaseBranchOperator
where you implement the skipping logic. I believe you'll need a DummyOperator
as the "skip" branch, and your regular DAG flow as the other arm. For your cron expression, use whatever the normal schedule should be and implement custom skips in the task that's handling branching.
Upvotes: 0