Reputation: 1365
I am creating a server which needs to make an external request while responding. To handle concurrent requests I'm using Python's asyncio
library. I have followed some examples from the standard library. It seems however that some of my tasks are destroyed, printing Task was destroyed but it is pending!
to my terminal. After some debugging and research I found a stackoverflow answer which seemed to explain why.
I have created a minimal example demonstrating this effect below. My question is in what way should one counteract this effect? Storing a hard reference to the task, by for example storing asyncio.current_task()
in a global variable mitigates the issue. It also seems to work fine if I wrap the future remote_read.read()
as await asyncio.wait_for(remote_read.read(), 5)
. However I do feel like these solutions are ugly.
# run and visit http://localhost:8080/ in your browser
import asyncio
import gc
async def client_connected_cb(reader, writer):
remote_read, remote_write = await asyncio.open_connection("google.com", 443, ssl=True)
await remote_read.read()
async def cleanup():
while True:
gc.collect()
await asyncio.sleep(1)
async def main():
server = await asyncio.start_server(client_connected_cb, "localhost", 8080)
await asyncio.gather(server.serve_forever(), cleanup())
asyncio.run(main())
I am running Python 3.10 on macOS 10.15.7.
Upvotes: 8
Views: 2498
Reputation: 110476
It looks that by the time being, the only way is actually keeping a reference manually.
Maybe a decorator is something more convenient than having to manually add the code in each async function. I opted for the class design, so that a class attribute can hold the hard-references while the tasks run. (A local variable in the wrapper function would be part of the task-reference cycle, and the garbage collection would trigger all the same):
# run and visit http://localhost:8080/ in your browser
import asyncio
import gc
from functools import wraps
import weakref
class Shielded:
registry = set()
def __init__(self, func):
self.func = func
async def __call__(self, *args, **kw):
self.registry.add(task:=asyncio.current_task())
try:
result = await self.func(*args, **kw)
finally:
self.registry.remove(task)
return result
def _Shielded(func):
# Used along with the print sequence to assert the task was actually destroyed without commenting
async def wrapper(*args, **kwargs):
ref = weakref.finalize(asyncio.current_task(), lambda: print("task destroyed"))
return await func(*args, **kwargs)
return wrapper
@Shielded
async def client_connected_cb(reader, writer):
print("at task start")
#registry.append(asyncio.current_task())
# I've connected this to a socket in an interactive session, I'd explictly .close() for debugging:
remote_read, remote_write = await asyncio.open_connection("localhost", 8060, ssl=False)
print("comensing remote read")
await remote_read.read()
print("task complete")
async def cleanup():
while True:
gc.collect()
await asyncio.sleep(1)
async def main():
server = await asyncio.start_server(client_connected_cb, "localhost", 8080)
await asyncio.gather(server.serve_forever(), cleanup())
asyncio.run(main())
Moreover, I wanted to "really see it", so I created a "fake" _Shielded
decorator that would just log something when the underlying task
got deleted: "task complete" is never printed with it, indeed.
Upvotes: 2