Eduard Sukharev
Eduard Sukharev

Reputation: 1311

Apache Airflow: run all parallel tasks in single DAG run

I have a DAG that has 30 (or more) dynamically created parallel tasks.

I have concurrency option set on that DAG so that I only have single DAG Run running, when catching up the history. When I run it on my server only 16 tasks actually run in parallel, while the rest 14 just wait being queued.

Which setting should I alter so that I have only 1 DAG Run running, but with all 30+ tasks running in parallel?

According to this FAQ, it seems like it's one of the dag_concurrency or max_active_runs_per_dag, but the former seem to be overdriven by concurrency setting already, while the latter seemed to have no effect (or I effectively messed up my setup). Here's the sample code:

import datetime as dt
import logging

from airflow.operators.dummy_operator import DummyOperator

import config

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'wait_for_downstream': True,
    'concurrency': 1,
    'retries': 0,
}


def print_operators(ds, **kwargs):
    logging.info(f"Task {kwargs.get('task_instance_key_str', 'unknown_task_instance')}")


dag = DAG(
    dag_id='test_parallelism_dag',
    start_date=dt.datetime(2019, 1, 1),
    default_args=default_args,
    schedule_interval='@daily',
    catchup=True,
    template_searchpath=[config.DAGS_PATH],
    params={'schema': config.SCHEMA_DB},
    max_active_runs=1,
)

print_operators = [PythonOperator(
    task_id=f'test_parallelism_dag.print_operator_{i}',
    python_callable=print_operators,
    provide_context=True,
    dag=dag
) for i in range(60)]

dummy_operator_start = DummyOperator(
    task_id=f'test_parallelism_dag.dummy_operator_start',
)

dummy_operator_end = DummyOperator(
    task_id=f'test_parallelism_dag.dummy_operator_end',
)

dummy_operator_start >> print_operators >> dummy_operator_end

EDIT 1: My current airflow.cfg contains:

executor = SequentialExecutor
parallelism = 32
dag_concurrency = 24
max_active_runs_per_dag = 26

My env variables are as following (set all of them different to easily spot which one helps):

AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAG_CONCURRENCY=18
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=20
AIRFLOW__CORE__WORKER_CONCURRENCY=22

And with that I have following Gantt diagram: enter image description here

Which kind of gives me a hint that setting DAG_CONCURRENCY env variable works.

Upvotes: 4

Views: 11772

Answers (2)

kaxil
kaxil

Reputation: 18844

Update the concurrency config as well in your airflow.cfg file. If it is 16, increase it to 32.

If you are using Celery Executor, change worker_concurrency to 32.

Upvotes: 3

Eduard Sukharev
Eduard Sukharev

Reputation: 1311

The actual parameter to change was dag_concurrency in airflow.cfg or override it with AIRFLOW__CORE__DAG_CONCURRENCY env variable.

As per docs I referred to in my question:

concurrency: The Airflow scheduler will run no more than $concurrency task instances for your DAG at any given time. Concurrency is defined in your Airflow DAG. If you do not set the concurrency on your DAG, the scheduler will use the default value from the dag_concurrency entry in your airflow.cfg.

Which means following simplified code:

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'wait_for_downstream': True,
    'concurrency': 1,
}


dag = DAG(
    dag_id='test_parallelism_dag',
    default_args=default_args,
    max_active_runs=1,
)

should be rewritten to:

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'wait_for_downstream': True,
}


dag = DAG(
    dag_id='test_parallelism_dag',
    default_args=default_args,
    max_active_runs=1,
    concurrency=30
)

My code actually has wrong assumption that default_args at some point substitute actual kwargs to DAG constructor. I don't know what lead me to that conclusion back then, but I guess setting concurrency to 1 there is some draft leftover, which never actually affected anything and actual DAG concurrency was set from config default, which is 16.

Upvotes: 7

Related Questions