Reputation: 650
SETUP: I have a large list (over 100+) of tasks (coroutines) that connect to a REST API database server. The coroutines use a client connection pool. I think that the client connection pool is cutting me off, because I am not able to get all my results. I also think that I could use a Semaphore to limit the concurrent connections to the API server, and get all my results before my script finishes. Here's a minimal example:
q = Queue(-1)
progress = tqdm(total=total_hits)
sem = asyncio.Semaphore(1)
for task in asyncio.as_completed(tasks):
async with sem:
res = await task
q.put_nowait(res["data"])
progress.update(len(res["data"]))
while res["links"].get("next", None) is not None:
res = await client.get_json_async(res["links"]["next"])
q.put_nowait(res["data"])
progress.update(len(res["data"]))
PROBLEM: I know that I have 10,000 data points to capture. However, I consistently only capture about half of those. I think it's because the client is limiting my TCP connections to the server.
Any ideas?
Upvotes: 1
Views: 1727
Reputation: 3836
You can combine asyncio.Semaphore
and asyncio.as_completed
the following way:
import asyncio
import time
async def make_request(url: str, s: asyncio.Semaphore):
"""simulates request"""
async with s:
await asyncio.sleep(2)
return f"{url}: {time.monotonic()}"
async def amain():
"""main wrapper."""
s = asyncio.Semaphore(5)
tasks = [make_request(f"url-{i}" ,s) for i in range(26)]
for cor in asyncio.as_completed(tasks):
res = await cor
print(res)
if __name__ == '__main__':
asyncio.run(amain())
On the other hand if you need to limit number of requests due to API timelimit, you will probably need to use some TimeLimiter from any 3rd party library.
I am author one one of such libraries BucketRateLimiter and I will be grateful if you use it.
Upvotes: 4