Reputation: 369
I've been trying to understanding asynchronous programming in Python. I tried writing a simple throttler that allows only rate_limit
number of tasks to be processed at a time.
Here's the implementation:
import asyncio
import time
class Throttler:
def __init__ (
self,
rate_limit: int,
retry_interval: float
) -> None:
self.rate_limit = rate_limit
self.retry_interval = retry_interval
self._time_counter = None
self._tasks_counter = 0
self.lock = asyncio.Lock()
async def __aenter__(
self
) -> 'Throttler':
async with self.lock:
print(f'Starting {self._tasks_counter}')
if self._time_counter is None:
pass
else:
difference = time.perf_counter() - self._time_counter
# if difference < self.retry_interval:
# await asyncio.sleep(self.retry_interval - difference)
while True:
if self._tasks_counter < self.rate_limit:
break
else:
print('here')
await asyncio.sleep(self.retry_interval)
if self._time_counter is not None:
print(time.perf_counter() - self._time_counter)
self._time_counter = time.perf_counter()
self._tasks_counter += 1
return self
async def __aexit__(
self,
exc_type,
exc_val,
exc_tb
) -> None:
async with self.lock:
self._tasks_counter -= 1
print(f'Ending {self._tasks_counter}')
throttler = Throttler(rate_limit = 5, retry_interval = 2.0)
async def f ():
async with throttler:
print(42)
await asyncio.sleep(1)
async def main ():
await asyncio.gather(*[f() for i in range(10)])
asyncio.run(main())
I expected that as rate_limit
is set to 5 when I declare the Throttler, it should process at most 5 requests at a time and then wait until one or more of them finish to begin processing other requests. But it doesn't work as I expect it to and raises a RuntimeError as soon as it encounters one of the asyncio.sleep
statements (even the commented one if you uncomment it).
Here's the complete traceback:
Traceback (most recent call last):
File "C:\Users\Aryan V S\Desktop\Projects\General\Python\Other\Async\throttle.py", line 288, in f
await asyncio.sleep(1)
File "C:\Programming\Python\lib\asyncio\locks.py", line 120, in acquire
await fut
RuntimeError: Task <Task pending name='Task-3' coro=<f() running at C:\Users\Aryan V S\Desktop\Projects\General\Python\Other\Async\throttle.py:288> cb=[gather.<locals>._done_callback() at C:\Programming\Python\lib\asyncio\tasks.py:766, gather.<locals>._done_callback() at C:\Programming\Python\lib\asyncio\tasks.py:766]> got Future <Future pending> attached to a different loop
Traceback (most recent call last):
File "C:\Users\Aryan V S\Desktop\Projects\General\Python\Other\Async\throttle.py", line 293, in <module>
asyncio.run(main())
File "C:\Programming\Python\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "C:\Programming\Python\lib\asyncio\base_events.py", line 642, in run_until_complete
return future.result()
File "C:\Users\Aryan V S\Desktop\Projects\General\Python\Other\Async\throttle.py", line 291, in main
await asyncio.gather(*[f() for i in range(10)])
File "C:\Users\Aryan V S\Desktop\Projects\General\Python\Other\Async\throttle.py", line 286, in f
async with throttler:
File "C:\Users\Aryan V S\Desktop\Projects\General\Python\Other\Async\throttle.py", line 247, in __aenter__
async with self.lock:
File "C:\Programming\Python\lib\asyncio\locks.py", line 14, in __aenter__
await self.acquire()
File "C:\Programming\Python\lib\asyncio\locks.py", line 120, in acquire
await fut
RuntimeError: Task <Task pending name='Task-8' coro=<f() running at C:\Users\Aryan V S\Desktop\Projects\General\Python\Other\Async\throttle.py:286> cb=[gather.<locals>._done_callback() at C:\Programming\Python\lib\asyncio\tasks.py:766]> got Future <Future pending> attached to a different loop
What am I doing wrong here? Does asyncio.sleep
not use the currently running event loop or am I not understanding how this works? Thanks a lot for your time!
Upvotes: 0
Views: 268
Reputation: 436
This is because you are creating the lock before the asyncio.run runs a loop, try creating the throttler in main() and sending it to f().
Upvotes: 1