monstermac77
monstermac77

Reputation: 330

Celery Workers' Utilization Decreases With More Workers

I'm trying to make thousands of GET requests in the smallest amount of time possible. I need to do so in a scalable way: doubling the number of servers I use to make the requests should halve the time to complete for a fixed number of URLs.

I'm using Celery with the eventlet pool and RabbitMQ as the broker. I'm spawning one worker process on each worker server with --concurrency 100 and have a dedicated master server issuing tasks (the code below). I'm not getting the results I expect: the time to complete is not reduced at all when doubling the number of worker servers used.

It appears as though as I add more worker servers, the utilization of each worker goes down (as reported by Flower). For example, with 2 workers, throughout execution the number of active threads per worker hovers in the 80 to 90 range (as expected, since concurrency is 100). However, with 6 workers, the number of active threads per worker hovers in the 10 to 20 range.

It's almost like the queue size is too small, or worker servers can't pull tasks off of the queue fast enough to be fully utilized and as you add more workers they have a harder time pulling tasks off the queue quickly.

urls = ["https://...", ..., "https://..."]
tasks = []
num = 0
for url in urls:
    num = num + 1
    tasks.append(fetch_url.s(num, url))

job = group(tasks)
start = time.time()
res = job.apply_async()
res.join()
print time.time() - start

Update: I have attached a graph of the succeeded tasks vs. time when using 1 worker server, 2 worker servers, etc. up to 5 worker servers. As you can see, the rate of task completion doubles going from 1 worker server to 2 worker servers, but as I add on more servers, the rate of task completion begins to level off.enter image description here

Upvotes: 4

Views: 1648

Answers (1)

temoto
temoto

Reputation: 5577

For future readers. Actions that helped, most significant benefit first:

  • Group several small work units into one celery task
  • Switch Celery broker from RabbitMQ to Redis

More useful hints not mentioned in original comment discussion, therefore unknown benefit significance for this question.

  • Use httplib2 or urllib3 or better HTTP library. requests burns CPU for no good reason
  • Use HTTP connection pool. Check and make sure you reuse permanent connections to target servers.

Chunking explained.

Before chunking

urls = [...]

function task(url)
  response = http_fetch(url)
  return process(response.body)

celery.apply_async url1
celery.apply_async url2
...

So task queue contains N=len(urls) tasks, each task is to fetch single url, perform some calculations on response.

With chunking

function chunk(xs, n)
  loop:
  g, rest = xs[:n], xs[n:]
  yield g

chunks = [ [url1, url2, url3], [4, 5, 6], ... ]

function task(chunk)
  pool = eventlet.GreenPool()
  result = {
    response.url: process(response)
    for response in pool.imap(http_fetch, chunk)
  }
  return result

celery.apply_async chunk1
celery.apply_async chunk2
...

Now task queue contains M=len(urls)/chunksize tasks, each task is to fetch chunksize urls and process all responses. Now you have to multiplex concurrent url fetches inside single chunk. Here it's done with Eventlet GreenPool.

Note, because Python, it is likely beneficial to first perform all network IO then perform all CPU calculations on all responses in chunk, amortizing CPU load via multiple celery workers.

All code in this answer is showing general direction only. You must implement better version with less copying and allocations.

Upvotes: 2

Related Questions