Arrow
Arrow

Reputation: 369

Why does asyncio.sleep() cause "got Future attached to a different loop"?

I've been trying to understanding asynchronous programming in Python. I tried writing a simple throttler that allows only rate_limit number of tasks to be processed at a time.

Here's the implementation:

import asyncio
import time

class Throttler:

    def __init__ (
            self,
            rate_limit: int,
            retry_interval: float
    ) -> None:

        self.rate_limit = rate_limit
        self.retry_interval = retry_interval

        self._time_counter = None
        self._tasks_counter = 0

        self.lock = asyncio.Lock()

    async def __aenter__(
            self
    ) -> 'Throttler':

        async with self.lock:
            print(f'Starting {self._tasks_counter}')

            if self._time_counter is None:
                pass
            else:
                difference = time.perf_counter() - self._time_counter
                # if difference < self.retry_interval:
                #     await asyncio.sleep(self.retry_interval - difference)

            while True:
                if self._tasks_counter < self.rate_limit:
                    break
                else:
                    print('here')
                    await asyncio.sleep(self.retry_interval)

            if self._time_counter is not None:
                print(time.perf_counter() - self._time_counter)

            self._time_counter = time.perf_counter()
            self._tasks_counter += 1

        return self

    async def __aexit__(
            self,
            exc_type,
            exc_val,
            exc_tb
    ) -> None:

        async with self.lock:
            self._tasks_counter -= 1
            print(f'Ending {self._tasks_counter}')

throttler = Throttler(rate_limit = 5, retry_interval = 2.0)

async def f ():
    async with throttler:
        print(42)
        await asyncio.sleep(1)

async def main ():
    await asyncio.gather(*[f() for i in range(10)])

asyncio.run(main())

I expected that as rate_limit is set to 5 when I declare the Throttler, it should process at most 5 requests at a time and then wait until one or more of them finish to begin processing other requests. But it doesn't work as I expect it to and raises a RuntimeError as soon as it encounters one of the asyncio.sleep statements (even the commented one if you uncomment it).

Here's the complete traceback:

Traceback (most recent call last):
  File "C:\Users\Aryan V S\Desktop\Projects\General\Python\Other\Async\throttle.py", line 288, in f
    await asyncio.sleep(1)
  File "C:\Programming\Python\lib\asyncio\locks.py", line 120, in acquire
    await fut
RuntimeError: Task <Task pending name='Task-3' coro=<f() running at C:\Users\Aryan V S\Desktop\Projects\General\Python\Other\Async\throttle.py:288> cb=[gather.<locals>._done_callback() at C:\Programming\Python\lib\asyncio\tasks.py:766, gather.<locals>._done_callback() at C:\Programming\Python\lib\asyncio\tasks.py:766]> got Future <Future pending> attached to a different loop
Traceback (most recent call last):
  File "C:\Users\Aryan V S\Desktop\Projects\General\Python\Other\Async\throttle.py", line 293, in <module>
    asyncio.run(main())
  File "C:\Programming\Python\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Programming\Python\lib\asyncio\base_events.py", line 642, in run_until_complete
    return future.result()
  File "C:\Users\Aryan V S\Desktop\Projects\General\Python\Other\Async\throttle.py", line 291, in main
    await asyncio.gather(*[f() for i in range(10)])
  File "C:\Users\Aryan V S\Desktop\Projects\General\Python\Other\Async\throttle.py", line 286, in f
    async with throttler:
  File "C:\Users\Aryan V S\Desktop\Projects\General\Python\Other\Async\throttle.py", line 247, in __aenter__
    async with self.lock:
  File "C:\Programming\Python\lib\asyncio\locks.py", line 14, in __aenter__
    await self.acquire()
  File "C:\Programming\Python\lib\asyncio\locks.py", line 120, in acquire
    await fut
RuntimeError: Task <Task pending name='Task-8' coro=<f() running at C:\Users\Aryan V S\Desktop\Projects\General\Python\Other\Async\throttle.py:286> cb=[gather.<locals>._done_callback() at C:\Programming\Python\lib\asyncio\tasks.py:766]> got Future <Future pending> attached to a different loop

What am I doing wrong here? Does asyncio.sleep not use the currently running event loop or am I not understanding how this works? Thanks a lot for your time!

Upvotes: 0

Views: 268

Answers (1)

eagr
eagr

Reputation: 436

This is because you are creating the lock before the asyncio.run runs a loop, try creating the throttler in main() and sending it to f().

Upvotes: 1

Related Questions