Spandan Singh
Spandan Singh

Reputation: 712

Apache Airflow: Delay a task for some period of time

I am trying to execute a task after 5 minutes from the parent task inside a DAG.

DAG : Task 1 ----> Wait for 5 minutes ----> Task 2

How can I achieve this in Apache Airflow? Thanks in advance.

Upvotes: 20

Views: 49525

Answers (4)

Makrushin Evgenii
Makrushin Evgenii

Reputation: 1181

You can return pendulum.now() from first task to init TimeSensor:

@task 
def task_1():
  return pendulum.now()

def task_2():
  ...


first_task_finished_at = task_1()
TimeSensor(target_time=first_task_finished_at.add(minutes=5)) >> task_2()

TimeDeltaSensor is not suitable, because it waits specified delta after context["data_interval_end"]:

def poke(self, context: Context):
  target_dttm = context["data_interval_end"]
  target_dttm += self.delta
  self.log.info("Checking if the time (%s) has come", target_dttm)
  return timezone.utcnow() > target_dttm

Upvotes: 0

yingw
yingw

Reputation: 307

It's 2023 now, although it's linked in one of the comments, the way to implement this is with TimeDeltaSensor() or TimeDeltaSensorAsync()

import pendulum
from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync

@dag(
    start_date=pendulum.datetime(2023, 9, 1),
    schedule=None,
    catchup=False,
    tags=["example"],
)
def example_wait():
    t10 = TimeDeltaSensor(task_id="wait_some_seconds", delta=pendulum.duration(seconds=10))
    t10
 

See docs

Upvotes: 8

y2k-shubham
y2k-shubham

Reputation: 11607

The said behaviour can be achieved by introducing a task that forces a delay of specified duration between your Task 1 and Task 2


This can be achieved using PythonOperator

import time
from airflow.operators.python import PythonOperator

delay_python_task: PythonOperator = PythonOperator(task_id="delay_python_task",
                                                   dag=my_dag,
                                                   python_callable=lambda: time.sleep(300))

task_1 >> delay_python_task >> task_2

Or using BashOperator as well

from airflow.operators.bash import BashOperator
delay_bash_task: BashOperator = BashOperator(task_id="delay_bash_task",
                                             dag=my_dag,
                                             bash_command="sleep 5m")
task_1 >> delay_bash_task >> task_2

Note: The given code-snippets are NOT tested


References


UPDATE-1

Here are some other ways of introducing delay

  • UPDATE: do NOT use this as pointed out by @Vit.ai. Original point: on_success_callback / on_failure_callback: Depending of whether Task 2 is supposed to run upon success or failure of Task 1, you can pass lambda: time.sleep(300) in either of these params of Task 1.
  • pre_execute() / post_execute(): Invoking time.sleep(300) in Task 1's post_execute() or Task 2's pre_execute() would also have the same effect. Of course this would involve modifying code for your tasks (1 or 2) so better avoid it

Personally I would prefer the extra task approach because it makes things more explicit and doesn't falsely exaggerate the runtime of your Task 1 or Task 2

Upvotes: 33

Vit.ai
Vit.ai

Reputation: 51

@y2k-shubham gave the best answer to date, however, I want to warn not to use the callback solution. as it first marks the task as success and then executes the callback. which means task2 will not see any delay. if you don't want to use a separate task, you can use something like this:

< ... >
task1 = DummyOperator(task_id='task1', dag=dag)
task1.post_execute = lambda **x: time.sleep(300)
task2 = DummyOperator(task_id'task2', dag=dag)

task1 >> task2

Upvotes: 3

Related Questions