Reputation: 61
I first make a simple request to get a JSON containing all the names, then I iterate over all the names and make asynchronous awaitable calls corresponding to each name, and store them in a list called "tasks", and then I gather all of them.
The problem is, the response server has a limit to the api responses per minute, and no matter how low I keep the semaphore value, this code takes the same time (small enough to not meet the server's expectations) to make the API calls, as if the semaphore doesn't exist at all. How do I control the API call rate?
<some code>
url = http://example.com/
response = requests.request("GET", url, headers=headers)
async def get_api(session, url_dev):
async with session.get(url_dev, headers = headers) as resp:
result = await resp.json()
return result
async def main():
async with aiohttp.ClientSession() as session:
sem = asyncio.Semaphore(1)
tasks = []
for i in response.json()["Names"]:
url_dev = "https://example.com/example/" + str(i["Id"])
await sem.acquire()
async with sem:
tasks.append(asyncio.create_task(get_api(session, url_dev)))
full_list = list()
async with sem:
full_list = await asyncio.gather(*tasks)
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())
Upvotes: 1
Views: 4524
Reputation: 71471
You should acquire and release the semaphore object when you run the request to the API endpoint in get_api
, instead of when you create the tasks and gather
the results. Also, based on your sample use case, there should be no need to manually call sem.acquire
and sem.release
when you use its context manager instead:
async def get_api(session, sem:asyncio.Semaphore, url_dev):
#below, using both the semaphore and session.get in a context manager
#now, the semaphore will properly block requests when the limit has been reached, until others have finished
async with sem, session.get(url_dev, headers = headers) as resp:
result = await resp.json()
return result
async def main():
sem = asyncio.Semaphore(1)
async with aiohttp.ClientSession() as session:
tasks = []
for i in response.json()["Names"]:
url_dev = "https://example.com/example/" + str(i["Id"])
#passing the semaphore instance to get_api
tasks.append(asyncio.create_task(get_api(session, sem, url_dev)))
full_list = await asyncio.gather(*tasks)
Upvotes: 1
Reputation: 23684
Semaphore here really isn't the right tool to manage rate limiting unless you are going to increment the semaphore in a separate loop, or add a sleep inside the critical section. You could also schedule a follow up task to sleep and then deque the semaphore.
Further, you've queued all of the tasks inside the critical section, but the execution happens async to the critical section because you queued it as a task. You need to have the semaphore inside the get_api method.
Also, you're acquiring the semaphore twice; either use the acquire
method and try
/ finally
, or use async with
, but not both. See the docs
Here is a simple script to illustrate how you can have a task loop that does not exceed starting more than 5 tasks per 5 second interval:
async def dequeue(sem, sleep):
"""Wait for a duration and then increment the semaphore"""
try:
await asyncio.sleep(sleep)
finally:
sem.release()
async def task(sem, sleep, data):
"""Decrement the semaphore, schedule an increment, and then work"""
await sem.acquire()
asyncio.create_task(dequeue(sem, sleep))
# logic here
print(data)
async def main():
max_concurrent = 5
sleep = 5
sem = asyncio.Semaphore(max_concurrent)
tasks = [asyncio.create_task(task(sem, sleep, i)) for i in range(15)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
You could also wrap this logic in a decorator if you want to get really fancy:
def rate_limited(max_concurrent, duration):
def decorator(func):
semaphore = asyncio.Semaphore(max_concurrent)
async def dequeue():
try:
await asyncio.sleep(duration)
finally:
semaphore.release()
@functools.wraps(func)
async def wrapper(*args, **kwargs):
await semaphore.acquire()
asyncio.create_task(dequeue())
return await func(*args, **kwargs)
return wrapper
return decorator
Then the code becomes the follow (note semaphore was created outside of asyncio.run
, so you need to query the default loop for it to work properly):
@rate_limited(max_concurrent=5, duration=5)
async def task(i):
print(i)
async def main():
tasks = [asyncio.create_task(task(i)) for i in range(7)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Upvotes: 4