Reputation: 190
I am working with airflow on Google Cloud Composer (version: composer-1.10.2-airflow-1.10.6).
I realized that my that the scheduler doesn't schedule task when there is a lot of tasks to process (See Gantt view below)
(don't pay attention to the colours, the red tasks are "createTable Operators" that fail if the table already exists so they have to fail 5 times before the next part (the important one) of the DAG runs)
There are gaps of hours between tasks! (for example 5 hours between 10am and 15pm and nothing happened)
Normally it works fine with ~40 DAGs with about 100-200 tasks each (sometimes a bit more). But recently I added 2 DAGs with a lot of tasks (~5000 each) and the scheduler is very slow or doesn't schedule tasks. On the screenshot, I paused the 2 DAGs with a lot of tasks at 15pm and the scheduler is back again, doing its work fine.
Do you have any solution about it?
Airflow is meant to be a tool that handle an "infinite" amount of tasks.
Here are some information about my environment:
Here are some information about airflow configuration:
╔════════════════════════════════╦═══════╗
║ Airflow parameter ║ value ║
╠════════════════════════════════╬═══════╣
║ -(celery)- ║ ║
║ worker_concurrency ║ 32 ║
║ -(webserver)- ║ ║
║ default_dag_run_display_number ║ 2 ║
║ workers ║ 2 ║
║ worker_refresh_interval ║ 60 ║
║ -(core)- ║ ║
║ max_active_runs_per_dag ║ 1 ║
║ dagbag_import_timeout ║ 600 ║
║ parallelism ║ 200 ║
║ min_file_process_interval ║ 60 ║
║ -(scheduler)- ║ ║
║ processor_poll_interval ║ 5 ║
║ max_threads ║ 2 ║
╚════════════════════════════════╩═══════╝
Thank you for your help
EDIT:
26 of my DAGs are created by a single .py file by parsing a huge JSON variable to create all the DAGs and tasks.
Maybe the problem comes from this because today Airflow is scheduling tasks from others DAGs than the 26 (especially the 2 big DAGs) I described. More precisely, Airflow is sometimes scheduling the tasks of my 26 DAGs but it schedules much more easily and more often the tasks of the other DAGs.
Upvotes: 3
Views: 3491
Reputation: 91609
High inter-task latency is usually an indicator that there is a scheduler-related bottleneck (as opposed to something worker-related). Even when running the same DAGs over and over again, it's still possible for a Composer environment to suffer from performance bottlenecks like this, because work can be distributed differently each time, or there may be different processes running in the background.
To start, I would recommend increasing the number of threads available to the scheduler (scheduler.max_threads
), and then ensuring that your scheduler is not consuming all CPU of the node it resides on. You can check CPU metrics for the node the scheduler resides on by identifying where it is, then checking in the Cloud Console. To find the node name:
# Obtain the Composer namespace name
kubectl get namespaces | grep composer
# Check for the scheduler
kubectl get pods -n $NAMESPACE -o wide | grep scheduler
If the above doesn't help, then it's also possible that the scheduler is intentionally blocking on a condition. To inspect all the conditions that are evaluated when the scheduler is checking for tasks to run, set core.logging_level=DEBUG
. In the scheduler logs (which you can filter for in Cloud Logging), you can then check all the conditions that passed or failed in order for a task to run or to stay queued.
Upvotes: 4