3nadh
3nadh

Reputation: 76

Inducing sleep delays in Airflow backfill Dag

For backfill jobs I have tried timedelta sensor, but it seem to check for Execution Time + 60 mins , instead of current time + 60 mins. Is there a way I can induce current time + 60 min delay for backfill jobs ?

Essentially this delay would avoid me reaching API rate limit. Currently using below task for delay, which does not seem to work.

    task_id='wait',
    delta=timedelta(minutes=60),
    dag=dag
)

Upvotes: 1

Views: 1287

Answers (2)

Mike Taylor
Mike Taylor

Reputation: 699

You'll need your own custom Operator or Sensor and use the task instance's start_date attribute. Using the start_date is the only change you'd need to make from the TimeDeltaSensor in the library.

If you copy the TimeDeltaSensor source code you can override the poke method with something like:

        # ...
        target_dttm = context["task_instance"].start_date
        target_dttm += self.delta
        return timezone.utcnow() > target_dttm

 Added

Though reading your Question again, I see that you might just want an operator with a sleep in its execute. Or a python callable that has a sleep pattern in it and push that through the PythonOperator... something like

def hit_api():
    pass

def hit_and_back_off:
    while True:
        try:
            hit_api()
        except APILimitError:
            sleep(<SOME SLEEP>)
        except e:
            raise

Upvotes: 3

3nadh
3nadh

Reputation: 76

Easiest solution I have found is to have another DAG , which trigers scheduled backfills

Upvotes: 1

Related Questions