Reputation: 779
I want to schedule a dag daily, but at different times in the day, for instance :
How can I achieve this ?
Upvotes: 2
Views: 2265
Reputation: 599
How about adding a delay-step (with randomized sleep interval), and schedule it before actually doing something?
E.g.
def _delay_dag():
import random
import time
delay_by = random.randint(0,60*60*6) #e.g. to delay up to 6h from schedule time
print(f'Delaying start by {delay_by} seconds..')
time.sleep(delay_by)
return True
def _do_something(ti, *kwargs):
pass
with DAG('my_DAG_with_random_start',
schedule_interval='@daily',
tags=['test','random_start'],
default_args=default_args) as dag:
delay_dag = PythonOperator(
task_id = 'delay_dag',
python_callable=_delay_dag
)
do_something = PythonOperator(
task_id = 'do_something',
provide_context=True,
python_callable=_do_something
)
delay_dag >> do_something
Upvotes: 2
Reputation: 2919
If you want the dag to be scheduled randomly once a day, write python helper code. In your dag code right before you define your dag, put a seeded random (that doesn't change with date) to create a pseudo rand.
In this example, I've converted the full date to numerical date but you can use whatever method you prefer.
Something Like the below code should work.
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
dag=Dag('TestDag,schedule_interval=randomCronString,default_args=args, catchup=False)
Edit
Airflow runs the dag every 5 seconds but by using a seeded random, you're forcing the random to only change when the seed changes (in this case when the day flips over), keep in mind though, in most systems airflow is on UTC.
If you run the following code block over and over
import random
import datetime
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
You get the following results
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
Upvotes: 4