Reputation: 1359
As a step one in my dag, i am trying to get a list of items from some source, say idList with count of say 100.
Is it possible in Airflow to process all the 100 items in idList, with a task concurrency of 4 maximum? (4 at a time) After one task is complete, it should pick up the next id from the idList and create task dynamically to process it.
I have tried the Dynamic Task Mapping but it doesn't seem to have a max parallelization/concurrency factor associated with it for the specific DAG Run.
Upvotes: 2
Views: 3585
Reputation: 5096
In Airflow 2.6, we introduced a new parameter max_active_tis_per_dagrun
to control the mapped task concurrency in the same DAG run.
Here is an example:
import pendulum
import time
from airflow.decorators import dag, task
@dag(
dag_id='max_active_tis_per_dagrun',
default_args={},
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=None
)
def processing_dag():
@task
def get_numbers():
return list(range(20))
@task(max_active_tis_per_dagrun=2)
def process(number):
print(number)
time.sleep(5)
numbers = get_numbers()
process.expand(number=numbers)
my_dag = processing_dag()
You can trigger 4 DAG runs via the UI, and check how many mapped task will be running in parallel in each DAG run.
Upvotes: 3
Reputation: 3064
You can use pools to limit parallelism. Configure the name of a pool on the mapped task (e.g. pool="max_2"
), for example:
import time
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(dag_id="dynamic_task_mapping_with_pool", start_date=datetime(2023, 1, 1), schedule_interval=None):
@task
def list_files():
return list(range(10))
@task(pool="max_2")
def process_files(file):
print(f"Do something with {file}")
time.sleep(5)
process_files.expand(file=list_files())
With a pool of size 2, you'll see the mapped instances progress in batches of 2 instances:
Upvotes: 1