j7skov
j7skov

Reputation: 650

How do I use a Semaphore with asyncio.as_completed in Python?

SETUP: I have a large list (over 100+) of tasks (coroutines) that connect to a REST API database server. The coroutines use a client connection pool. I think that the client connection pool is cutting me off, because I am not able to get all my results. I also think that I could use a Semaphore to limit the concurrent connections to the API server, and get all my results before my script finishes. Here's a minimal example:

q = Queue(-1)
progress = tqdm(total=total_hits)
sem = asyncio.Semaphore(1)
for task in asyncio.as_completed(tasks):
    async with sem:
        res = await task
        q.put_nowait(res["data"])
        progress.update(len(res["data"]))
        while res["links"].get("next", None) is not None:
            res = await client.get_json_async(res["links"]["next"])
            q.put_nowait(res["data"])
            progress.update(len(res["data"]))

PROBLEM: I know that I have 10,000 data points to capture. However, I consistently only capture about half of those. I think it's because the client is limiting my TCP connections to the server.

Any ideas?

Upvotes: 1

Views: 1727

Answers (1)

Artiom  Kozyrev
Artiom Kozyrev

Reputation: 3836

You can combine asyncio.Semaphore and asyncio.as_completed the following way:

import asyncio
import time


async def make_request(url: str, s: asyncio.Semaphore):
    """simulates request"""
    async with s:
        await asyncio.sleep(2)

    return f"{url}: {time.monotonic()}"


async def amain():
    """main wrapper."""
    s = asyncio.Semaphore(5)
    tasks = [make_request(f"url-{i}" ,s) for i in range(26)]
    for cor in asyncio.as_completed(tasks):
        res = await cor
        print(res)


if __name__ == '__main__':
    asyncio.run(amain())

On the other hand if you need to limit number of requests due to API timelimit, you will probably need to use some TimeLimiter from any 3rd party library.

I am author one one of such libraries BucketRateLimiter and I will be grateful if you use it.

Upvotes: 4

Related Questions