Reputation: 588
I have run following dag in Airflow,
When executing above dag, It will run one of the following order serially.
A -> B -> C1 -> C2 -> D1 -> D2
A -> B -> C2 -> C1 -> D2 -> D1
but my requirement is run both C1 and C2 tasks parallely. Part of my airflow.cfg
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = CeleryExecutor
#executor = LocalExecutor
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16
# Number of workers to refresh at a time. When set to 0, worker refresh is
# disabled. When nonzero, airflow periodically refreshes webserver workers by
# bringing up new ones and killing old ones.
worker_refresh_batch_size = 1
# Number of seconds to wait before refreshing a batch of workers.
worker_refresh_interval = 30
# Secret key used to run your flask app
secret_key = temporary_key
# Number of workers to run the Gunicorn web server
workers = 4
[celery]
# This section only applies if you are using the CeleryExecutor in
# [core] section above
# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor
# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
celeryd_concurrency = 16
# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run. However airflow will never
# use more threads than the amount of cpu cores available.
max_threads = 2
Upvotes: 4
Views: 5927
Reputation: 414
add concurrency=x (where x is int greater than 1) in your dag properties.
max_active_runs is dag concurrency. concurrency is task concurrency.
example:
dag = DAG(
dag_id,
default_args=default_args,
schedule_interval='00 03 * * *',
max_active_runs=2,
concurrency=2)
Upvotes: 2
Reputation: 418
This seems to be a configuration issue. From your configuration I see that the executor is CeleryExecutor. Check the database and message broker components.
If those are not configured to run in parallel, your tasks won't run in parellel as well.
Upvotes: 0
Reputation: 18844
If you are just testing it on a single machine then I suggest using LocalExecutor
. SequentialExecutor
runs tasks serially and CeleryExecutor
would need a cluster of machines which a message broker.
Also, when you use LocalExecutor
, you should use a meta DB different than sqlite
as sqlite
doesn't support parallel reads. So you can use Postgres
or MySQL
and accordingly change sql_alchemy_conn
in airflow.cfg
file.
Read this: https://airflow.apache.org/howto/initialize-database.html
“LocalExecutor”, an executor that can parallelize task instances locally.
Upvotes: 1