Reputation: 1311
I have a DAG that has 30 (or more) dynamically created parallel tasks.
I have concurrency
option set on that DAG so that I only have single DAG Run running, when catching up the history.
When I run it on my server only 16 tasks actually run in parallel, while the rest 14 just wait being queued.
Which setting should I alter so that I have only 1 DAG Run running, but with all 30+ tasks running in parallel?
According to this FAQ, it seems like it's one of the dag_concurrency
or max_active_runs_per_dag
, but the former seem to be overdriven by concurrency
setting already, while the latter seemed to have no effect (or I effectively messed up my setup).
Here's the sample code:
import datetime as dt
import logging
from airflow.operators.dummy_operator import DummyOperator
import config
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
'retries': 0,
}
def print_operators(ds, **kwargs):
logging.info(f"Task {kwargs.get('task_instance_key_str', 'unknown_task_instance')}")
dag = DAG(
dag_id='test_parallelism_dag',
start_date=dt.datetime(2019, 1, 1),
default_args=default_args,
schedule_interval='@daily',
catchup=True,
template_searchpath=[config.DAGS_PATH],
params={'schema': config.SCHEMA_DB},
max_active_runs=1,
)
print_operators = [PythonOperator(
task_id=f'test_parallelism_dag.print_operator_{i}',
python_callable=print_operators,
provide_context=True,
dag=dag
) for i in range(60)]
dummy_operator_start = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_start',
)
dummy_operator_end = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_end',
)
dummy_operator_start >> print_operators >> dummy_operator_end
EDIT 1:
My current airflow.cfg
contains:
executor = SequentialExecutor
parallelism = 32
dag_concurrency = 24
max_active_runs_per_dag = 26
My env variables are as following (set all of them different to easily spot which one helps):
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAG_CONCURRENCY=18
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=20
AIRFLOW__CORE__WORKER_CONCURRENCY=22
And with that I have following Gantt diagram:
Which kind of gives me a hint that setting DAG_CONCURRENCY env variable works.
Upvotes: 4
Views: 11772
Reputation: 18844
Update the concurrency
config as well in your airflow.cfg
file. If it is 16, increase it to 32.
If you are using Celery Executor, change worker_concurrency
to 32.
Upvotes: 3
Reputation: 1311
The actual parameter to change was dag_concurrency
in airflow.cfg or override it with AIRFLOW__CORE__DAG_CONCURRENCY
env variable.
As per docs I referred to in my question:
concurrency
: The Airflow scheduler will run no more than$concurrency
task instances for your DAG at any given time. Concurrency is defined in your Airflow DAG. If you do not set the concurrency on your DAG, the scheduler will use the default value from thedag_concurrency
entry in your airflow.cfg.
Which means following simplified code:
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
}
dag = DAG(
dag_id='test_parallelism_dag',
default_args=default_args,
max_active_runs=1,
)
should be rewritten to:
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
}
dag = DAG(
dag_id='test_parallelism_dag',
default_args=default_args,
max_active_runs=1,
concurrency=30
)
My code actually has wrong assumption that default_args
at some point substitute actual kwargs to DAG constructor. I don't know what lead me to that conclusion back then, but I guess setting concurrency
to 1
there is some draft leftover, which never actually affected anything and actual DAG concurrency was set from config default, which is 16.
Upvotes: 7