Jerald Baker
Jerald Baker

Reputation: 1359

Airflow - execute X dynamic tasks with max 4 tasks in parallel

As a step one in my dag, i am trying to get a list of items from some source, say idList with count of say 100.

Is it possible in Airflow to process all the 100 items in idList, with a task concurrency of 4 maximum? (4 at a time) After one task is complete, it should pick up the next id from the idList and create task dynamically to process it.

I have tried the Dynamic Task Mapping but it doesn't seem to have a max parallelization/concurrency factor associated with it for the specific DAG Run.

Upvotes: 2

Views: 3585

Answers (2)

Hussein Awala
Hussein Awala

Reputation: 5096

In Airflow 2.6, we introduced a new parameter max_active_tis_per_dagrun to control the mapped task concurrency in the same DAG run.

Here is an example:

import pendulum
import time

from airflow.decorators import dag, task


@dag(
    dag_id='max_active_tis_per_dagrun',
    default_args={},
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=None
)
def processing_dag():

    @task
    def get_numbers():
        return list(range(20))

    @task(max_active_tis_per_dagrun=2)
    def process(number):
        print(number)
        time.sleep(5)

    numbers = get_numbers()

    process.expand(number=numbers)


my_dag = processing_dag()

You can trigger 4 DAG runs via the UI, and check how many mapped task will be running in parallel in each DAG run.

Upvotes: 3

Bas Harenslak
Bas Harenslak

Reputation: 3064

You can use pools to limit parallelism. Configure the name of a pool on the mapped task (e.g. pool="max_2"), for example:

import time
from datetime import datetime

from airflow import DAG
from airflow.decorators import task

with DAG(dag_id="dynamic_task_mapping_with_pool", start_date=datetime(2023, 1, 1), schedule_interval=None):

    @task
    def list_files():
        return list(range(10))

    @task(pool="max_2")
    def process_files(file):
        print(f"Do something with {file}")
        time.sleep(5)

    process_files.expand(file=list_files())

With a pool of size 2, you'll see the mapped instances progress in batches of 2 instances:

enter image description here enter image description here

Upvotes: 1

Related Questions