MoorzTech
MoorzTech

Reputation: 380

Python-Celery increase concurrency on worker

TL;DR

Is it possible to increase or decrease concurrency on a running Celery worker without restarting it?

I'm using celery 4.0.0 with RabbitMQ as a broker on ubuntu 14.10

My Usecase

I am frequently facing a large queue of tasks, most of which primarily execute an HTTP-Request and do some minor processing. I have the worker running on a fairly powerful machine and would like to maximize resource utilization on it. This is not a problem most of the time, except when dealing with these large number of HTTP-Requests, which might timeout or take a very long time to respond etc. When dealing with these, I'd like to temporarily increase the --concurrency-parameter, without actually having to restart the worker.

Currently I'm running celery with --concurrency 150, but this will only get the servers bottleneck (CPU) to about ~10% utilization. I suppose one solution would be to spawn another 150-concurrency worker for that time period and kill it later, but that might add complexity down the road. I'd like to stick to 1 worker/machine if at all possible.

Upvotes: 3

Views: 2898

Answers (1)

MoorzTech
MoorzTech

Reputation: 380

It would probably be possible to use celeries builtin autoscaling (thanks Philip Tzou) by subclassing it. Unfortunately celeries Autoscale-functionality is quite poorly documented.

However, after doing some more digging, I came across celery.app.control, which (among other things) allows for scaling by sending messages to the worker via RabbitMQ. Here's a little example of how one could go about this:

import os, time
from celery import Celery
from celery.app.control import Control

app = Celery()
controller = Control(app)

while True:
    n=5 # the numer of processes to add/remove
    upper_load_threshold = 6
    lower_load_threshold = 4
    if os.getloadavg()[0] <= lower_load_threshold: # we're looking at the 5 min load avg here
        controller.pool_grow(n)
    elif os.getloadavg()[0] >= upper_load_threshold:
        controller.pool_shrink(n)
    time.sleep(10)

Upvotes: 3

Related Questions