Reputation: 76
For backfill jobs I have tried timedelta sensor, but it seem to check for Execution Time + 60 mins , instead of current time + 60 mins. Is there a way I can induce current time + 60 min delay for backfill jobs ?
Essentially this delay would avoid me reaching API rate limit. Currently using below task for delay, which does not seem to work.
task_id='wait',
delta=timedelta(minutes=60),
dag=dag
)
Upvotes: 1
Views: 1287
Reputation: 699
You'll need your own custom Operator or Sensor and use the task instance's start_date
attribute. Using the start_date
is the only change you'd need to make from the TimeDeltaSensor
in the library.
If you copy the TimeDeltaSensor source code you can override the poke method with something like:
# ...
target_dttm = context["task_instance"].start_date
target_dttm += self.delta
return timezone.utcnow() > target_dttm
Though reading your Question again, I see that you might just want an operator with a sleep in its execute. Or a python callable that has a sleep pattern in it and push that through the PythonOperator... something like
def hit_api():
pass
def hit_and_back_off:
while True:
try:
hit_api()
except APILimitError:
sleep(<SOME SLEEP>)
except e:
raise
Upvotes: 3
Reputation: 76
Easiest solution I have found is to have another DAG , which trigers scheduled backfills
Upvotes: 1