Reputation: 330
I'm trying to make thousands of GET requests in the smallest amount of time possible. I need to do so in a scalable way: doubling the number of servers I use to make the requests should halve the time to complete for a fixed number of URLs.
I'm using Celery with the eventlet pool and RabbitMQ as the broker. I'm spawning one worker process on each worker server with --concurrency 100
and have a dedicated master server issuing tasks (the code below). I'm not getting the results I expect: the time to complete is not reduced at all when doubling the number of worker servers used.
It appears as though as I add more worker servers, the utilization of each worker goes down (as reported by Flower). For example, with 2 workers, throughout execution the number of active threads per worker hovers in the 80 to 90 range (as expected, since concurrency is 100). However, with 6 workers, the number of active threads per worker hovers in the 10 to 20 range.
It's almost like the queue size is too small, or worker servers can't pull tasks off of the queue fast enough to be fully utilized and as you add more workers they have a harder time pulling tasks off the queue quickly.
urls = ["https://...", ..., "https://..."]
tasks = []
num = 0
for url in urls:
num = num + 1
tasks.append(fetch_url.s(num, url))
job = group(tasks)
start = time.time()
res = job.apply_async()
res.join()
print time.time() - start
Update: I have attached a graph of the succeeded tasks vs. time when using 1 worker server, 2 worker servers, etc. up to 5 worker servers. As you can see, the rate of task completion doubles going from 1 worker server to 2 worker servers, but as I add on more servers, the rate of task completion begins to level off.
Upvotes: 4
Views: 1648
Reputation: 5577
For future readers. Actions that helped, most significant benefit first:
More useful hints not mentioned in original comment discussion, therefore unknown benefit significance for this question.
httplib2
or urllib3
or better HTTP library. requests
burns CPU for no good reasonChunking explained.
urls = [...]
function task(url)
response = http_fetch(url)
return process(response.body)
celery.apply_async url1
celery.apply_async url2
...
So task queue contains N=len(urls) tasks, each task is to fetch single url, perform some calculations on response.
function chunk(xs, n)
loop:
g, rest = xs[:n], xs[n:]
yield g
chunks = [ [url1, url2, url3], [4, 5, 6], ... ]
function task(chunk)
pool = eventlet.GreenPool()
result = {
response.url: process(response)
for response in pool.imap(http_fetch, chunk)
}
return result
celery.apply_async chunk1
celery.apply_async chunk2
...
Now task queue contains M=len(urls)/chunksize tasks, each task is to fetch chunksize urls and process all responses. Now you have to multiplex concurrent url fetches inside single chunk. Here it's done with Eventlet GreenPool.
Note, because Python, it is likely beneficial to first perform all network IO then perform all CPU calculations on all responses in chunk, amortizing CPU load via multiple celery workers.
All code in this answer is showing general direction only. You must implement better version with less copying and allocations.
Upvotes: 2