Reputation: 1882
I have already looked here. But still can't get my head around it. Here is how I am currently accomplishing this:
urls_without_rate_limit =
[
'http://httpbin.org/get'
'http://httpbin.org/get',
'http://httpbin.org/get',
'http://httpbin.org/get',
'http://httpbin.org/get'
]
urls_with_rate_limit =
[
'http://eu.httpbin.org/get'
'http://eu.httpbin.org/get',
'http://eu.httpbin.org/get',
'http://eu.httpbin.org/get',
'http://eu.httpbin.org/get'
]
api_rate = 2
api_limit = 6
loop = asyncio.get_event_loop()
loop.run_until_complete(
process(urls=urls_without_rate_limit, rate=0, limit=len(url_list)))
loop.run_until_complete(
process(urls=urls_with_rate_limit, rate=api_rate, limit=api_limit))
async def process(urls, rate, limit):
limit = asyncio.Semaphore(limit)
f = Fetch(
rate=rate,
limit=limit
)
tasks = []
for url in urls:
tasks.append(f.make_request(url=url))
results = await asyncio.gather(*tasks)
As you can see it will finish the first round of process
then start the second round for rate limits.
It works fine but is there a way I could start both rounds at the same time with different rate limits?
tvm
Upvotes: 1
Views: 975
Reputation: 10942
I'll elaborate on what I commented. So you can try to work on you own solution (even though I'll give the complete code here).
You can have a dictionary defining some rules (api -> rate limit per second):
APIS_RATE_LIMIT_PER_S = {
"http://api.mathjs.org/v4?precision=5": 1,
"http://api.mathjs.org/v4?precision=2": 3,
}
Which you can then use to decide which semaphore to pick according to the request URL (in practice you would have to do some parsing to get the endpoints you want to control). Once you have that it's just a matter of using the semaphore to make sure you limit the number of simultaneous process executing your request. The last piece to the puzzle is obviously to add a delay before releasing the semaphore.
I'll get for a different version of what is suggested here, but it's basically the same solution. I just made it so you can modify the session object so each call to session.get
will automatically apply rate limit control.
def set_rate_limits(session, apis_rate_limits_per_s):
semaphores = {api: asyncio.Semaphore(s) for api, s in apis_rate_limits_per_s.items()}
@asynccontextmanager
async def limit_rate(url):
await semaphores[url].acquire()
start = time.time()
try:
yield semaphores[url]
finally:
duration = time.time() - start
await asyncio.sleep(1 - duration)
semaphores[url].release()
def add_limit_rate(coroutine):
async def coroutine_with_rate_limit(url, *args, **kwargs):
async with limit_rate(url):
return await coroutine(url, *args, **kwargs)
return coroutine_with_rate_limit
session.get = add_limit_rate(session.get)
session.post = add_limit_rate(session.post)
return session
Notice that using add_limit_rate
you could add rate limit control to any coroutine that has an API endpoint as first argument. But here we will just modify session.get
and session.post
.
In the end you could use the set_rate_limits
function like so:
async def main():
apis = APIS_RATE_LIMIT_PER_S.keys()
params = [
{"expr" : "2^2"},
{"expr" : "1/0.999"},
{"expr" : "1/1.001"},
{"expr" : "1*1.001"},
]
async with aiohttp.ClientSession() as session:
session = set_rate_limits(session, APIS_RATE_LIMIT_PER_S)
api_requests = [get_text_result(session, url, params=p) for url, p in product(apis, params)]
text_responses = await asyncio.gather(*api_requests)
print(text_responses)
async def get_text_result(session, url, params=None):
result = await session.get(url, params=params)
return await result.text()
If you run this code you wont see much of what is happening, you could add some print
here and there in set_rate_limits
to "make sure" the rate limit is correctly enforced:
import time
# [...] change this part :
def add_limit_rate(coroutine):
async def coroutine_with_rate_limit(url, *args, **kwargs):
async with limit_rate(url):
######### debug
global request_count
request_count += 1
this_req_id = request_count
rate_lim = APIS_RATE_LIMIT_PER_S[url]
print(f"request #{this_req_id} -> \t {(time.time() - start)*1000:5.0f}ms \t rate {rate_lim}/s")
########
r = await coroutine(url, *args, **kwargs)
######### debug
print(f"request #{this_req_id} <- \t {(time.time() - start)*1000:5.0f}ms \t rate {rate_lim}/s")
#########
return r
If you run this example asyncio.run(main())
, you should get something like:
request #1 -> 1ms rate 1/s
request #2 -> 2ms rate 3/s
request #3 -> 3ms rate 3/s
request #4 -> 3ms rate 3/s
request #1 <- 1003ms rate 1/s
request #2 <- 1004ms rate 3/s
request #3 <- 1004ms rate 3/s
request #5 -> 1004ms rate 1/s
request #6 -> 1005ms rate 3/s
request #4 <- 1006ms rate 3/s
request #5 <- 2007ms rate 1/s
request #6 <- 2007ms rate 3/s
request #7 -> 2007ms rate 1/s
request #7 <- 3008ms rate 1/s
request #8 -> 3008ms rate 1/s
request #8 <- 4010ms rate 1/s
It seems rate limit is respected here, in particular we can have a look at the API with a rate limit of 1 request per second:
request #1 -> 1ms rate 1/s
request #1 <- 1003ms rate 1/s
request #5 -> 1004ms rate 1/s
request #5 <- 2007ms rate 1/s
request #7 -> 2007ms rate 1/s
request #7 <- 3008ms rate 1/s
request #8 -> 3008ms rate 1/s
request #8 <- 4010ms rate 1/s
On the other hand, this solution is not very satisfying as we artificially add a 1s ping to all our requests. This is because of this part of the code:
await asyncio.sleep(1 - duration)
semaphores[url].release()
The problem here is that we are waiting for the sleep to finish before giving out control back to the event loop (scheduling another task, another request). That can easily be solved using this piece of code instead:
asyncio.create_task(release_after_delay(semaphores[url], 1 - duration))
With release_after_delay
simply being:
async def release_after_delay(semaphore, delay):
await asyncio.sleep(delay)
semaphore.release()
The asyncio.create_task
function makes the coroutine "run this in the background". Which means in this code that the semaphore will be released later, but that we don't need to wait for it to give control back to the even loop (which means some other request can be scheduled and also that we can get the result in add_limit_rate
). In other words, we don't care about the result of this coroutine, we just want it to run at some point in the future (which is probably why this function used to be call ensure_future
).
Using this patch, we have the following for the API with rate limit set to one request per second:
request #1 -> 1ms rate 1/s
request #1 <- 214ms rate 1/s
request #2 -> 1002ms rate 1/s
request #2 <- 1039ms rate 1/s
request #3 -> 2004ms rate 1/s
request #3 <- 2050ms rate 1/s
request #4 -> 3009ms rate 1/s
request #4 <- 3048ms rate 1/s
It's definitively closer to what we would expect this code to do. We get each response from our API as soon as we can (in this example the ping is 200ms/37ms/46ms/41ms). And the rate limit is respected too.
This is probably not the most beautiful code, but it can be a start for you to work with. Maybe make a clean package with that once you have it working nicely, I guess that's something other people may like to use.
Upvotes: 2