Reputation: 63
If my DAG is this
[setup] -> [processing-task] -> [end].
How can I schedule this DAG to run periodically, while running [setup] task only once (on first scheduled run) and skipping it for all later runs?
Upvotes: 5
Views: 2765
Reputation: 280
Here is a way to do it without need to create a new class. I found this simpler than the accepted answer and it worked well for my use case.
Might be useful for others!
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
with DAG(
dag_id='your_dag_id',
default_args={
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Dag with initial setup task that only runs on start_date',
start_date=datetime(2000, 1, 1),
# Runs daily at 1 am
schedule_interval='0 1 * * *',
# catchup must be true if start_date is before datetime.now()
catchup=True,
max_active_runs=1,
) as dag:
def branch_fn(**kwargs):
# Have to make sure start_date will equal data_interval_start on first run
# This dag is daily but since the schedule_interval is set to 1 am data_interval_start would be
# 2000-01-01 01:00:00 when it needs to be
# 2000-01-01 00:00:00
date = kwargs['data_interval_start'].replace(hour=0, minute=0, second=0, microsecond=0)
if date == dag.start_date:
return 'initial_task'
else:
return 'skip_initial_task'
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=branch_fn,
provide_context=True
)
initial_task = DummyOperator(
task_id="initial_task"
)
skip_initial_task = DummyOperator(
task_id="skip_initial_task"
)
next_task = DummyOperator(
task_id="next_task",
# This is important otherwise next_task would be skipped
trigger_rule="one_success"
)
branch_task >> [initial_task, skip_initial_task] >> next_task
Upvotes: 1
Reputation: 874
Check out this post in medium which describes how to implement a "run once" operator. I have successfully used this several times.
Upvotes: 3