SJxD
SJxD

Reputation: 269

how many tasks can be scheduled in a single airflow dag?

I am completely new to airflow, and couldn't find anywhere that how many tasks can be scheduled in a single airflow DAG. And what can be the maximum size of each task.

I want to schedule a task which should be able to handle millions of queries and identify its type and schedule the next task according to the type of query.

Read complete documentation but couldn't find it

Upvotes: 10

Views: 21253

Answers (3)

scwol111
scwol111

Reputation: 116

I think the maximum number of scheduled tasks depends on the airflow DB. I used SQLite in my airflow. I tried to create a lot of tasks and the airflow caused an error.

Traceback (most recent call last):
  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
sqlite3.OperationalError: too many SQL variables

Thus, for SQLite, the maximum number of scheduled tasks is 996 (founded experementally).

# DAG for limit testing

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'airflow_user', 
    'start_date': days_ago(0),
}

with DAG(
    'Task_limit',
    default_args = default_args,
    description = 'Find task limit',
    schedule_interval = None, 
) as dag:
    for i in range(996):
        task = BashOperator(
            task_id = "try_port_" + str(i),
            bash_command='echo ' + str(i),
            dag = dag,
       )
# if the range is increased, then an error occurs

MB for another database, this number will be higher.

P.S. After a while, I will replace SQLite with PostgreSQL, so I will find a limit for the new DB.

Upvotes: 1

alexopoulos7
alexopoulos7

Reputation: 912

Well using concurrency parameter can let you control how many running task instances a DAG is allowed to have, beyond which point things get queued. This FAQ from the airflow site has really valuable information about task scheduling.

Lastly, about the size of the tasks, there is no limit from the Airflow side. The only soft requirement posed by Airflow is to create idempotent tasks. So basically as Taylor explained above the task size is limited by the executor - worker that you will select (Kubernetes, Celery, Dask or Local) and the resources that you will have available to your workers.

Upvotes: 0

Taylor D. Edmiston
Taylor D. Edmiston

Reputation: 13016

There are no limits to how many tasks can be part of a single DAG.

Through the Airflow config, you can set concurrency limitations for execution time such as the maximum number of parallel tasks overall, maximum number of concurrent DAG runs for a given DAG, etc. There are settings at the Airflow level, DAG level, and operator level for more coarse to fine-grained control.

Here are the high-level concurrency settings you can tweak:

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# Are DAGs paused by default at creation
dags_are_paused_at_creation = True

# When not using pools, tasks are run in the "default pool",
# whose size is guided by this config element
non_pooled_task_slot_count = 128

# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 16

Reference: default_airflow.cfg

The parallelism settings are described in more detail in this answer. As far as the maximum "size" of each task, I'm assuming you're referring to resource allocation, such as memory or CPU. This is user configurable depending upon which executor you choose to use:

  • In a simple setup with LocalExecutor, for instance, it will use any resources available on the host.
  • In contrast, with the MesosExecutor on the other hand, one can define the max amount of CPU and/or memory that will be allocated to a task instance, and through DockerOperator you also have the option to define the maximum amount of CPU and memory a given task instance will use.
  • With the CeleryExecutor, you can set worker_concurrency to define the number of task instances each worker will take.

Another way to restrict execution is to use the Pools feature (example), for instance, you can set the max size of a pool of tasks talking to a database to 5 to prevent more than 5 tasks from hitting it at once (and potentially overloading the database/API/whatever resource you want to pool against).

Upvotes: 16

Related Questions