Reputation: 7685
I recently found and reproduced a memory leak caused by the use of asyncio.wait. Specifically, my program periodically executes some function until stop_event
is set. I simplified my program to the snippet below (with a reduced timeout to demonstrate the issue better):
async def main():
stop_event = asyncio.Event()
while True:
# Do stuff here
await asyncio.wait([stop_event.wait()], timeout=0.0001)
asyncio.run(main())
While this looked innocuous to me, it turns out there's a memory leak here. If you execute the code above, you'll see the memory usage growing to hundreds of MBs in a matter of minutes. This surprised me and took a long time to track down. I was expecting that after the timeout, anything I was waiting for would be cleaned up (since I'm not keeping any references to it myself). However, that turns out not to be the case.
Using gc.get_referrers, I was able to infer that every time I call asyncio.wait(...)
, a new task is created that holds a reference to the object returned by stop_event.wait()
and that task is kept around forever. Specifically, len(asyncio.all_tasks())
keeps increasing over time. Even if the timeout is passed, the tasks are still there. Only upon calling stop_event.set()
do these tasks all finish at once and does memory usage decrease drastically.
After discovering that, this note in the documentation made me try asyncio.wait_for instead:
Unlike wait_for(), wait() does not cancel the futures when a timeout occurs.
It turns out that actually behaves like I expected. There are no references kept after the timeout, and memory usage and number of tasks stay flat. This is the code without a memory leak:
async def main():
stop_event = asyncio.Event()
while True:
# Do stuff here
try:
await asyncio.wait_for(event.stop_event(), timeout=0.0001)
except asyncio.TimeoutError:
pass
asyncio.run(main())
While I'm happy this is fixed now, I don't really understand this behavior. If the timeout has been exceeded, why keep this task holding a reference around? It seems like that's a recipe for creating memory leaks. The note about not cancelling futures is also not clear to me. What if we don't explicitly cancel the future, but we just don't keep a task holding a reference after the timeout? Wouldn't that work as well?
It would be very much appreciated if anybody could shine some light on this. Thanks a lot!
Upvotes: 7
Views: 1066
Reputation: 628
Briefly, the thread's event loop somewhat owns, holds a reference to the Task, where returning from wait() has no bearing on that reference/ownership.
Details...
The behavior is expected, by design, because Python asyncio relies on an event loop for the thread. The event loop is created when you call asyncio.run(). The event loop instance itself takes a reference to, manages, and is therefore somewhat an owner of the Task instances until the Tasks complete.
You can see this behavior with a slight modification to your original sample outlining the issue. Simply modify the code to output a thread event loop's count of outstanding tasks, len(asyncio.all_tasks())
, and you will see the count grows. Obviously, asyncio.all_tasks() is referring to something holding the Task which is outside the scope of your direct code which is a user of asyncio...
import asyncio
async def main():
stop_event = asyncio.Event()
while True:
# Do stuff here
print(f"Tasks referenced by asyncio internals: count={len(asyncio.all_tasks())}")
await asyncio.wait([stop_event.wait()], timeout=0.0001)
asyncio.run(main())
Output:
Tasks referenced by asyncio internals: count=1
Tasks referenced by asyncio internals: count=2
Tasks referenced by asyncio internals: count=3
Tasks referenced by asyncio internals: count=4
Tasks referenced by asyncio internals: count=5
Tasks referenced by asyncio internals: count=6
Tasks referenced by asyncio internals: count=7
Tasks referenced by asyncio internals: count=8
Tasks referenced by asyncio internals: count=9
Tasks referenced by asyncio internals: count=10
Tasks referenced by asyncio internals: count=11
Tasks referenced by asyncio internals: count=12
...
The count is of what the thread's event loop is managing/referencing.
The following from docs may be helpful...
From Task Object...
... Tasks are used to run coroutines in event loops. If a coroutine awaits on a Future, the Task suspends the execution of the coroutine and waits for the completion of the Future. When the Future is done, the execution of the wrapped coroutine resumes.
Event loops use cooperative scheduling: an event loop runs one Task at a time. While a Task awaits for the completion of a Future, the event loop runs other Tasks, callbacks, or performs IO operations. ...
From asyncio.run...
...
This function runs the passed coroutine, taking care of managing the asyncio event loop, finalizing asynchronous generators, and closing the threadpool.
This function cannot be called when another asyncio event loop is running in the same thread.
...
This function always creates a new event loop and closes it at the end. It should be used as a main entry point for asyncio programs, and should ideally only be called once. ...
Take note the above mentions a new event loop is created and that this cannot be called when another loop already exists. So run is basically creating the event loop that is taking a reference to your coroutine's Task, and will hold it until the stop_event is set.
Therefore, the asyncio.wait
timing out has no bearing on the thread event loop's holding a reference to the Task, coroutine, and related. You are basically creating many Tasks and letting the event loop own them until they are complete.
More Details...
If you like to see tangibles, you can gander at the CPython source... here is one example showing a reference right when a Task is created and given to the event loop...
From .\Python310\Lib\asyncio\base_events.py, the following shows self_ready referencing the Task. The stack is at a breakpoint during the call to wait.
class BaseEventLoop(events.AbstractEventLoop):
def __init__(self):
self._timer_cancelled_count = 0
self._closed = False
self._stopping = False
self._ready = collections.deque()
self._scheduled = []
self._default_executor = None
self._internal_fds = 0
...
def _call_soon(self, callback, args, context):
handle = events.Handle(callback, args, self, context)
if handle._source_traceback:
del handle._source_traceback[-1]
self._ready.append(handle) # <--- one example of a reference of the coroutine,
# but the documentation is enough to go on here.
return handle
...
_call_soon (...\Python310\Lib\asyncio\base_events.py:773)
call_soon (...\Python310\Lib\asyncio\base_events.py:754)
create_task (...\Python310\Lib\asyncio\base_events.py:438)
_ensure_future (...\Python310\Lib\asyncio\tasks.py:636)
ensure_future (...\Python310\Lib\asyncio\tasks.py:615)
<setcomp> (...\Python310\Lib\asyncio\tasks.py:382)
wait (...\Python310\Lib\asyncio\tasks.py:382)
main (...\python_asyncio_so.py:9) <--- the sample code showing the "issue"
_run (...\Python310\Lib\asyncio\events.py:80)
_run_once (...\Python310\Lib\asyncio\base_events.py:1896)
run_forever (...\Python310\Lib\asyncio\base_events.py:600)
run_forever (...\Python310\Lib\asyncio\windows_events.py:321)
run_until_complete (...\Python310\Lib\asyncio\base_events.py:633)
run (...\Python310\Lib\asyncio\runners.py:44)
<module> (...\python_asyncio_so.py:11)
_run_code (...\Python310\Lib\runpy.py:86)
_run_module_as_main (...\Python310\Lib\runpy.py:196)
So self is an instance of ProactorEventLoop...
type(self)
<class 'asyncio.windows_events.ProactorEventLoop'>
The type of self is a
class BaseEventLoop(events.AbstractEventLoop):
^
|
class BaseProactorEventLoop(base_events.BaseEventLoop):
^
|
class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
So the instance of ProactorEventLoop, the thread's event loop, holds a reference to the Tasks you are creating over and over. It continues to manage those tasks.
The asyncio.wait call timing has no relationship on those Tasks being referenced or ending. It's merely a timeout for the wait call itself.
Per what @LieRyan mentioned, it's similar to socket select in an abstract sense. Think of asyncio.wait as a call to sample the current state of the supplied Task instances. Upon return, it will tell you what completed, what is pending. The timeout is how long you want to wait to get that state information, but the timeout does not force any particular state upon a Task, if that makes sense.
This was a great question... it's great you noticed the potential leak, easy to consider that!
Upvotes: 2
Reputation: 64913
The key concept to understand here is that the return value of wait()
is a tuple (completed, pending)
tasks.
The typical way to use wait()
-based code is like this:
async def main():
stop_event = asyncio.Event()
pending = [... add things to wait ...]
while pending:
completed, pending = await asyncio.wait(pending, timeout=0.0001)
process(completed) # e.g. update progress bar
pending.extend(more_tasks_to_wait)
wait()
with timeout isn't used to have one coroutine to wait for another coroutines/tasks to finish, instead its primary use case is for periodically flushing completed tasks, while letting the unfinished tasks to continue "in the background", so cancelling the unfinished tasks automatically isn't really desirable, because you usually want to continue waiting for those pending tasks again in the next iteration.
This usage pattern resembles the select()
system call.
On the other hand, the usage pattern of await wait_for(xyz, )
is basically just like doing await xyz
with a timeout. It's a common and much simpler use case.
Upvotes: 4