Kris
Kris

Reputation: 1744

iterative airflow execution

I have the following DAG with some simple tasks,

hour_list = [“0:00”, “1:00", “2:00”]

 for hour in hour_list:
    bash_op = BashOperator(
                task_id=‘task1_op1’+hour
                ,bash_command=“date”
                ,dag=dag
         )


    py_op = PythonOperator(
            task_id='doit’+hour,
            provide_context=True,
            python_callable=python_method,
            dag=dag)

    py_op.set_upstream(bash_op)

Now, I see the dag is executing in parallel for all the hours 0:00 to 2:00. It's an expected behaviour. But, I want to run the dags one hour after the other like the second hour execution is dependent on the first hour. I'm not sure any change in settings might help here. I appreciate your thoughts. thanks.

Upvotes: 0

Views: 423

Answers (1)

joebeeson
joebeeson

Reputation: 4366

You can accomplish this using the airflow.operators.sensors.TimeSensor "in between" the tasks. Something similar to the following:

from datetime import time

from airflow.operators.sensors import TimeSensor

[...]

for hour in ["00:00", "01:00", "02:00"]:
    TimeSensor(
        dag=dag,
        task_id="wait_{}".format(hour),
        target_time=time(*map(int, hour.split(":")))
    ) >> BashOperator(
        dag=dag,
        task_id="task1_op1_{}".format(hour),
        bash_command="date"
    ) >> PythonOperator(
        dag=dag,
        task_id="doit_{}".format(hour),
        provide_context=True,
        python_callable=python_method
    )

Upvotes: 1

Related Questions