user1768788
user1768788

Reputation: 1365

Why is an asyncio task garbage collected when opening a connection inside it?

I am creating a server which needs to make an external request while responding. To handle concurrent requests I'm using Python's asyncio library. I have followed some examples from the standard library. It seems however that some of my tasks are destroyed, printing Task was destroyed but it is pending! to my terminal. After some debugging and research I found a stackoverflow answer which seemed to explain why.

I have created a minimal example demonstrating this effect below. My question is in what way should one counteract this effect? Storing a hard reference to the task, by for example storing asyncio.current_task() in a global variable mitigates the issue. It also seems to work fine if I wrap the future remote_read.read() as await asyncio.wait_for(remote_read.read(), 5). However I do feel like these solutions are ugly.

# run and visit http://localhost:8080/ in your browser
import asyncio
import gc

async def client_connected_cb(reader, writer):
    remote_read, remote_write = await asyncio.open_connection("google.com", 443, ssl=True)
    await remote_read.read()

async def cleanup():
    while True:
        gc.collect()
        await asyncio.sleep(1)

async def main():
    server = await asyncio.start_server(client_connected_cb, "localhost", 8080)
    await asyncio.gather(server.serve_forever(), cleanup())

asyncio.run(main())

I am running Python 3.10 on macOS 10.15.7.

Upvotes: 8

Views: 2498

Answers (1)

jsbueno
jsbueno

Reputation: 110476

It looks that by the time being, the only way is actually keeping a reference manually.

Maybe a decorator is something more convenient than having to manually add the code in each async function. I opted for the class design, so that a class attribute can hold the hard-references while the tasks run. (A local variable in the wrapper function would be part of the task-reference cycle, and the garbage collection would trigger all the same):


# run and visit http://localhost:8080/ in your browser
import asyncio
import gc
from functools import wraps
import weakref

class Shielded:
    registry = set()

    def __init__(self, func):
        self.func = func

    async def __call__(self, *args, **kw):
        self.registry.add(task:=asyncio.current_task())
        try:
            result = await self.func(*args, **kw)
        finally:
            self.registry.remove(task)
        return result

def _Shielded(func):
    # Used along with the print sequence to assert the task was actually destroyed without commenting
    async def wrapper(*args, **kwargs):
        ref = weakref.finalize(asyncio.current_task(), lambda: print("task destroyed"))
        return await func(*args, **kwargs)
    return wrapper

@Shielded
async def client_connected_cb(reader, writer):
    print("at task start")
    #registry.append(asyncio.current_task())
    # I've connected this to a socket in an interactive session, I'd explictly .close() for debugging:
    remote_read, remote_write = await asyncio.open_connection("localhost", 8060, ssl=False)
    print("comensing remote read")
    await remote_read.read()
    print("task complete")

async def cleanup():
    while True:
        gc.collect()
        await asyncio.sleep(1)

async def main():
    server = await asyncio.start_server(client_connected_cb, "localhost", 8080)
    await asyncio.gather(server.serve_forever(), cleanup())

asyncio.run(main())

Moreover, I wanted to "really see it", so I created a "fake" _Shielded decorator that would just log something when the underlying task got deleted: "task complete" is never printed with it, indeed.

Upvotes: 2

Related Questions