Hasitha
Hasitha

Reputation: 588

How to run tasks parallely in apache Airflow

I have run following dag in Airflow, enter image description here

When executing above dag, It will run one of the following order serially.

A -> B -> C1 -> C2 -> D1 -> D2

A -> B -> C2 -> C1 -> D2 -> D1

but my requirement is run both C1 and C2 tasks parallely. Part of my airflow.cfg

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = CeleryExecutor
#executor = LocalExecutor

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# Number of workers to refresh at a time. When set to 0, worker refresh is
# disabled. When nonzero, airflow periodically refreshes webserver workers by
# bringing up new ones and killing old ones.
worker_refresh_batch_size = 1

# Number of seconds to wait before refreshing a batch of workers.
worker_refresh_interval = 30

# Secret key used to run your flask app
secret_key = temporary_key

# Number of workers to run the Gunicorn web server
workers = 4

[celery]
# This section only applies if you are using the CeleryExecutor in
# [core] section above

# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor

# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
celeryd_concurrency = 16

# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run. However airflow will never
# use more threads than the amount of cpu cores available.
max_threads = 2

Upvotes: 4

Views: 5927

Answers (3)

Alistair McIntyre
Alistair McIntyre

Reputation: 414

add concurrency=x (where x is int greater than 1) in your dag properties.

max_active_runs is dag concurrency. concurrency is task concurrency.

example:

    dag = DAG(
    dag_id,
    default_args=default_args,
    schedule_interval='00 03 * * *',
    max_active_runs=2,
    concurrency=2)

Upvotes: 2

Tameem
Tameem

Reputation: 418

This seems to be a configuration issue. From your configuration I see that the executor is CeleryExecutor. Check the database and message broker components.

If those are not configured to run in parallel, your tasks won't run in parellel as well.

Upvotes: 0

kaxil
kaxil

Reputation: 18844

If you are just testing it on a single machine then I suggest using LocalExecutor. SequentialExecutor runs tasks serially and CeleryExecutor would need a cluster of machines which a message broker.

Also, when you use LocalExecutor, you should use a meta DB different than sqlite as sqlite doesn't support parallel reads. So you can use Postgres or MySQL and accordingly change sql_alchemy_conn in airflow.cfg file.

Read this: https://airflow.apache.org/howto/initialize-database.html

“LocalExecutor”, an executor that can parallelize task instances locally.

Upvotes: 1

Related Questions