CClarke
CClarke

Reputation: 586

How to arrange a DAG to run some tasks in parallel and then one task when they have completed?

I have a few tasks that can be run at the same time. When they're finished I need to run a final task. I've tried to do this using task grouping like so:

import airflow
from airflow.utils.task_group import TaskGroup

with airflow.DAG(
        'my_dag',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1),
    ) as dag:

    with TaskGroup(group_id='task_group_1') as tg1:

    
        task1 = MyOperator(
            task_id='task1',
            dag=dag,
        )

        task2 = MyOperator(
            task_id='task2',
            dag=dag,
        )
        
        [task1, task2]    
    
    final_task = MyOtherOperator(
        task_id="final_task",
        dag=dag
    )

    tg1 >> final_task
   

However what happens here is final_task is run multiple times after each task in the task group so:

task1 -> final_task task2 -> final_task

What I want is for the task group to run in parallel and when it's finished for the the final task to run just once so:

[task1, task2] -> final_task

I thought using task groups would help me accomplish this requirement but it isn't working as expected. Can anyone help? Thank you.

EDIT: Here is the result from the Airflow docs example. It results in task3 being run after both group.task1 and group1.task2. I need it to run just once after both of the grouped tasks are finished.

enter image description here

LAST EDIT: It turns out I misunderstood tree view - graph view confirms the grouping operation though I am still getting some other errors for the final task. Thanks for helping me learn more about DAGs.

Upvotes: 1

Views: 7182

Answers (1)

Simon D
Simon D

Reputation: 6269

Try removing [task1, task2] from the TaskGroup so that it looks like the following:

import airflow
from airflow.utils.task_group import TaskGroup

with airflow.DAG(
        'my_dag',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1),
    ) as dag:

    with TaskGroup(group_id='task_group_1') as tg1:
        task1 = MyOperator(
            task_id='task1',
            dag=dag,
        )

        task2 = MyOperator(
            task_id='task2',
            dag=dag,
        )
        
    
    final_task = MyOtherOperator(
        task_id="final_task",
        dag=dag
    )

    tg1 >> final_task

I don't think you need to return anything from the TaskGroup as you're doing. Just reference the TaskGroup as a dependency.

Here is an example from the apache airflow documentation:

with TaskGroup("group1") as group1:
    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")

task3 = EmptyOperator(task_id="task3")

group1 >> task3

Also, you don't need to use TaskGroups to achieve this functionality. You could simply do this:

import airflow
from airflow.utils.task_group import TaskGroup

with airflow.DAG(
        'my_dag',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1),
    ) as dag:

    task1 = MyOperator(
        task_id='task1',
        dag=dag,
    )

    task2 = MyOperator(
        task_id='task2',
        dag=dag,
    )
        
    final_task = MyOtherOperator(
        task_id="final_task",
        dag=dag
    )

    task1 >> final_task
    task2 >> final_task

Upvotes: 5

Related Questions