Shery
Shery

Reputation: 1882

aiohttp: rate limiting requests-per-second by domain type

I have already looked here. But still can't get my head around it. Here is how I am currently accomplishing this:

urls_without_rate_limit = 
    [
       'http://httpbin.org/get'
       'http://httpbin.org/get',
       'http://httpbin.org/get',
       'http://httpbin.org/get',
       'http://httpbin.org/get'
    ]

urls_with_rate_limit = 
    [
       'http://eu.httpbin.org/get'
       'http://eu.httpbin.org/get',
       'http://eu.httpbin.org/get',
       'http://eu.httpbin.org/get',
       'http://eu.httpbin.org/get'
    ]

api_rate = 2
api_limit = 6

loop = asyncio.get_event_loop()
    loop.run_until_complete(
        process(urls=urls_without_rate_limit, rate=0, limit=len(url_list)))

    loop.run_until_complete(
        process(urls=urls_with_rate_limit, rate=api_rate, limit=api_limit))
async def process(urls, rate, limit):
    limit = asyncio.Semaphore(limit)

    f = Fetch(
        rate=rate,
        limit=limit
    )

    tasks = []
    for url in urls:
        tasks.append(f.make_request(url=url))

    results = await asyncio.gather(*tasks)

As you can see it will finish the first round of process then start the second round for rate limits.

It works fine but is there a way I could start both rounds at the same time with different rate limits?

tvm

Upvotes: 1

Views: 975

Answers (1)

cglacet
cglacet

Reputation: 10942

I'll elaborate on what I commented. So you can try to work on you own solution (even though I'll give the complete code here).

You can have a dictionary defining some rules (api -> rate limit per second):

APIS_RATE_LIMIT_PER_S = {
  "http://api.mathjs.org/v4?precision=5": 1,
  "http://api.mathjs.org/v4?precision=2": 3,
}

Which you can then use to decide which semaphore to pick according to the request URL (in practice you would have to do some parsing to get the endpoints you want to control). Once you have that it's just a matter of using the semaphore to make sure you limit the number of simultaneous process executing your request. The last piece to the puzzle is obviously to add a delay before releasing the semaphore.

I'll get for a different version of what is suggested here, but it's basically the same solution. I just made it so you can modify the session object so each call to session.get will automatically apply rate limit control.

def set_rate_limits(session, apis_rate_limits_per_s):
    semaphores = {api: asyncio.Semaphore(s) for api, s in apis_rate_limits_per_s.items()}

    @asynccontextmanager
    async def limit_rate(url):
        await semaphores[url].acquire() 
        start = time.time()
        try:
            yield semaphores[url]
        finally:
            duration = time.time() - start
            await asyncio.sleep(1 - duration)
            semaphores[url].release()

    def add_limit_rate(coroutine):

        async def coroutine_with_rate_limit(url, *args, **kwargs):
            async with limit_rate(url):
                return await coroutine(url, *args, **kwargs)

        return coroutine_with_rate_limit

    session.get = add_limit_rate(session.get)
    session.post = add_limit_rate(session.post)
    return session

Notice that using add_limit_rate you could add rate limit control to any coroutine that has an API endpoint as first argument. But here we will just modify session.get and session.post.

In the end you could use the set_rate_limits function like so:

async def main():
    apis = APIS_RATE_LIMIT_PER_S.keys()
    params = [
        {"expr" : "2^2"},
        {"expr" : "1/0.999"},
        {"expr" : "1/1.001"},
        {"expr" : "1*1.001"},
    ]
    async with aiohttp.ClientSession() as session:
        session = set_rate_limits(session, APIS_RATE_LIMIT_PER_S)
        api_requests = [get_text_result(session, url, params=p) for url, p  in product(apis, params)]
        text_responses = await asyncio.gather(*api_requests)
        print(text_responses)


async def get_text_result(session, url, params=None):
    result = await session.get(url, params=params)
    return await result.text()

If you run this code you wont see much of what is happening, you could add some print here and there in set_rate_limits to "make sure" the rate limit is correctly enforced:

import time

# [...] change this part : 
    def add_limit_rate(coroutine):

        async def coroutine_with_rate_limit(url, *args, **kwargs):
            async with limit_rate(url):
                ######### debug 
                global request_count
                request_count += 1
                this_req_id = request_count
                rate_lim = APIS_RATE_LIMIT_PER_S[url]
                print(f"request #{this_req_id} -> \t {(time.time() - start)*1000:5.0f}ms \t rate {rate_lim}/s")
                ########
                r = await coroutine(url, *args, **kwargs)

            ######### debug 
            print(f"request #{this_req_id} <- \t {(time.time() - start)*1000:5.0f}ms \t rate {rate_lim}/s")
            ######### 
            return r

If you run this example asyncio.run(main()), you should get something like:

request #1 ->        1ms     rate 1/s
request #2 ->        2ms     rate 3/s
request #3 ->        3ms     rate 3/s
request #4 ->        3ms     rate 3/s
request #1 <-     1003ms     rate 1/s
request #2 <-     1004ms     rate 3/s
request #3 <-     1004ms     rate 3/s
request #5 ->     1004ms     rate 1/s
request #6 ->     1005ms     rate 3/s
request #4 <-     1006ms     rate 3/s
request #5 <-     2007ms     rate 1/s
request #6 <-     2007ms     rate 3/s
request #7 ->     2007ms     rate 1/s
request #7 <-     3008ms     rate 1/s
request #8 ->     3008ms     rate 1/s
request #8 <-     4010ms     rate 1/s

It seems rate limit is respected here, in particular we can have a look at the API with a rate limit of 1 request per second:

request #1 ->        1ms     rate 1/s
request #1 <-     1003ms     rate 1/s
request #5 ->     1004ms     rate 1/s
request #5 <-     2007ms     rate 1/s
request #7 ->     2007ms     rate 1/s
request #7 <-     3008ms     rate 1/s
request #8 ->     3008ms     rate 1/s
request #8 <-     4010ms     rate 1/s

On the other hand, this solution is not very satisfying as we artificially add a 1s ping to all our requests. This is because of this part of the code:

await asyncio.sleep(1 - duration)
semaphores[url].release()

The problem here is that we are waiting for the sleep to finish before giving out control back to the event loop (scheduling another task, another request). That can easily be solved using this piece of code instead:

asyncio.create_task(release_after_delay(semaphores[url], 1 - duration))    

With release_after_delay simply being:

async def release_after_delay(semaphore, delay):
    await asyncio.sleep(delay)
    semaphore.release()

The asyncio.create_task function makes the coroutine "run this in the background". Which means in this code that the semaphore will be released later, but that we don't need to wait for it to give control back to the even loop (which means some other request can be scheduled and also that we can get the result in add_limit_rate). In other words, we don't care about the result of this coroutine, we just want it to run at some point in the future (which is probably why this function used to be call ensure_future).

Using this patch, we have the following for the API with rate limit set to one request per second:

request #1 ->        1ms     rate 1/s
request #1 <-      214ms     rate 1/s
request #2 ->     1002ms     rate 1/s
request #2 <-     1039ms     rate 1/s
request #3 ->     2004ms     rate 1/s
request #3 <-     2050ms     rate 1/s
request #4 ->     3009ms     rate 1/s
request #4 <-     3048ms     rate 1/s

It's definitively closer to what we would expect this code to do. We get each response from our API as soon as we can (in this example the ping is 200ms/37ms/46ms/41ms). And the rate limit is respected too.

This is probably not the most beautiful code, but it can be a start for you to work with. Maybe make a clean package with that once you have it working nicely, I guess that's something other people may like to use.

Upvotes: 2

Related Questions