Reputation: 586
I have a few tasks that can be run at the same time. When they're finished I need to run a final task. I've tried to do this using task grouping like so:
import airflow
from airflow.utils.task_group import TaskGroup
with airflow.DAG(
'my_dag',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1),
) as dag:
with TaskGroup(group_id='task_group_1') as tg1:
task1 = MyOperator(
task_id='task1',
dag=dag,
)
task2 = MyOperator(
task_id='task2',
dag=dag,
)
[task1, task2]
final_task = MyOtherOperator(
task_id="final_task",
dag=dag
)
tg1 >> final_task
However what happens here is final_task is run multiple times after each task in the task group so:
task1 -> final_task task2 -> final_task
What I want is for the task group to run in parallel and when it's finished for the the final task to run just once so:
[task1, task2] -> final_task
I thought using task groups would help me accomplish this requirement but it isn't working as expected. Can anyone help? Thank you.
EDIT: Here is the result from the Airflow docs example. It results in task3 being run after both group.task1 and group1.task2. I need it to run just once after both of the grouped tasks are finished.
LAST EDIT: It turns out I misunderstood tree view - graph view confirms the grouping operation though I am still getting some other errors for the final task. Thanks for helping me learn more about DAGs.
Upvotes: 1
Views: 7182
Reputation: 6269
Try removing [task1, task2]
from the TaskGroup so that it looks like the following:
import airflow
from airflow.utils.task_group import TaskGroup
with airflow.DAG(
'my_dag',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1),
) as dag:
with TaskGroup(group_id='task_group_1') as tg1:
task1 = MyOperator(
task_id='task1',
dag=dag,
)
task2 = MyOperator(
task_id='task2',
dag=dag,
)
final_task = MyOtherOperator(
task_id="final_task",
dag=dag
)
tg1 >> final_task
I don't think you need to return anything from the TaskGroup as you're doing. Just reference the TaskGroup as a dependency.
Here is an example from the apache airflow documentation:
with TaskGroup("group1") as group1:
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3")
group1 >> task3
Also, you don't need to use TaskGroups to achieve this functionality. You could simply do this:
import airflow
from airflow.utils.task_group import TaskGroup
with airflow.DAG(
'my_dag',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1),
) as dag:
task1 = MyOperator(
task_id='task1',
dag=dag,
)
task2 = MyOperator(
task_id='task2',
dag=dag,
)
final_task = MyOtherOperator(
task_id="final_task",
dag=dag
)
task1 >> final_task
task2 >> final_task
Upvotes: 5