Ramavtar Malav
Ramavtar Malav

Reputation: 63

Airflow Scheduling: how to run initial setup task only once?

If my DAG is this

[setup] -> [processing-task] -> [end].

How can I schedule this DAG to run periodically, while running [setup] task only once (on first scheduled run) and skipping it for all later runs?

Upvotes: 5

Views: 2765

Answers (2)

adjuric
adjuric

Reputation: 280

Here is a way to do it without need to create a new class. I found this simpler than the accepted answer and it worked well for my use case.

Might be useful for others!

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator

with DAG(
    dag_id='your_dag_id',
    default_args={
        'depends_on_past': False,
        'email': ['[email protected]'],
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Dag with initial setup task that only runs on start_date',
    start_date=datetime(2000, 1, 1),
    # Runs daily at 1 am
    schedule_interval='0 1 * * *',
    # catchup must be true if start_date is before datetime.now()
    catchup=True,
    max_active_runs=1,
) as dag:

    def branch_fn(**kwargs):
        # Have to make sure start_date will equal data_interval_start on first run
        # This dag is daily but since the schedule_interval is set to 1 am data_interval_start would be 
        # 2000-01-01 01:00:00 when it needs to be
        # 2000-01-01 00:00:00
        date = kwargs['data_interval_start'].replace(hour=0, minute=0, second=0, microsecond=0)
        if date == dag.start_date:
            return 'initial_task'
        else:
            return 'skip_initial_task'

    branch_task = BranchPythonOperator(
        task_id='branch_task', 
        python_callable=branch_fn,
        provide_context=True
    )

    initial_task = DummyOperator(
        task_id="initial_task"
    )
    
    skip_initial_task = DummyOperator(
        task_id="skip_initial_task"
    )

    next_task = DummyOperator(
        task_id="next_task",
        # This is important otherwise next_task would be skipped
        trigger_rule="one_success"
    )

    branch_task >> [initial_task, skip_initial_task] >> next_task

Upvotes: 1

bricca
bricca

Reputation: 874

Check out this post in medium which describes how to implement a "run once" operator. I have successfully used this several times.

Upvotes: 3

Related Questions