Anis Smail
Anis Smail

Reputation: 779

Airflow : How to schedule a dag at different times?

I want to schedule a dag daily, but at different times in the day, for instance :

How can I achieve this ?

Upvotes: 2

Views: 2265

Answers (2)

szeta
szeta

Reputation: 599

How about adding a delay-step (with randomized sleep interval), and schedule it before actually doing something?

E.g.

def _delay_dag():
    import random
    import time
    delay_by = random.randint(0,60*60*6) #e.g. to delay up to 6h from schedule time
    print(f'Delaying start by {delay_by} seconds..')
    time.sleep(delay_by)
    return True

def _do_something(ti, *kwargs):
    pass
    
with DAG('my_DAG_with_random_start', 
         schedule_interval='@daily', 
         tags=['test','random_start'],
         default_args=default_args) as dag:
    
    delay_dag = PythonOperator(
        task_id = 'delay_dag',
        python_callable=_delay_dag
    )

    do_something = PythonOperator(
        task_id = 'do_something',
        provide_context=True,
        python_callable=_do_something
    )
    
    delay_dag >> do_something

Upvotes: 2

zglin
zglin

Reputation: 2919

If you want the dag to be scheduled randomly once a day, write python helper code. In your dag code right before you define your dag, put a seeded random (that doesn't change with date) to create a pseudo rand.

In this example, I've converted the full date to numerical date but you can use whatever method you prefer.

Something Like the below code should work.

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
dag=Dag('TestDag,schedule_interval=randomCronString,default_args=args, catchup=False)

Edit

Airflow runs the dag every 5 seconds but by using a seeded random, you're forcing the random to only change when the seed changes (in this case when the day flips over), keep in mind though, in most systems airflow is on UTC.

If you run the following code block over and over

import random
import datetime

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)

You get the following results

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))

print(randomCronString)
* 1 * * *

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *

Upvotes: 4

Related Questions