Reputation: 1744
I have the following DAG with some simple tasks,
hour_list = [“0:00”, “1:00", “2:00”]
for hour in hour_list:
bash_op = BashOperator(
task_id=‘task1_op1’+hour
,bash_command=“date”
,dag=dag
)
py_op = PythonOperator(
task_id='doit’+hour,
provide_context=True,
python_callable=python_method,
dag=dag)
py_op.set_upstream(bash_op)
Now, I see the dag is executing in parallel for all the hours 0:00 to 2:00. It's an expected behaviour. But, I want to run the dags one hour after the other like the second hour execution is dependent on the first hour. I'm not sure any change in settings might help here. I appreciate your thoughts. thanks.
Upvotes: 0
Views: 423
Reputation: 4366
You can accomplish this using the airflow.operators.sensors.TimeSensor
"in between" the tasks. Something similar to the following:
from datetime import time
from airflow.operators.sensors import TimeSensor
[...]
for hour in ["00:00", "01:00", "02:00"]:
TimeSensor(
dag=dag,
task_id="wait_{}".format(hour),
target_time=time(*map(int, hour.split(":")))
) >> BashOperator(
dag=dag,
task_id="task1_op1_{}".format(hour),
bash_command="date"
) >> PythonOperator(
dag=dag,
task_id="doit_{}".format(hour),
provide_context=True,
python_callable=python_method
)
Upvotes: 1