Ron Serruya
Ron Serruya

Reputation: 4446

How can I wait for an object's __del__ to finish before the async loop closes?

I have a class that will have an aiohttp.ClientSession object in it.

Normally when you use

async with aiohttp.ClientSession() as session:  
   # some code

The session will close since the session's __aexit__ method is called.

I cant use a context manager since I want to keep the session persistent for the entire lifetime of the object.

This works:

import asyncio
import aiohttp

class MyAPI:
    def __init__(self):
        self.session = aiohttp.ClientSession()

    def __del__(self):
        # Close connection when this object is destroyed
        print('In __del__ now')
        asyncio.shield(self.session.__aexit__(None, None, None))



async def main():
    api = MyAPI()

asyncio.run(main())

However if in some place an exception is raised, the event loop is closed before the __aexit__ method is finished. How can I overcome this?

stacktrace:

Traceback (most recent call last):
  File "/home/ron/.PyCharm2018.3/config/scratches/async.py", line 19, in <module>
    asyncio.run(main())
  File "/usr/local/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
    return future.result()
  File "/home/ron/.PyCharm2018.3/config/scratches/async.py", line 17, in main
    raise ValueError
ValueError
In __del__ now
Exception ignored in: <function MyAPI.__del__ at 0x7f49982c0e18>
Traceback (most recent call last):
  File "/home/ron/.PyCharm2018.3/config/scratches/async.py", line 11, in __del__
  File "/usr/local/lib/python3.7/asyncio/tasks.py", line 765, in shield
  File "/usr/local/lib/python3.7/asyncio/tasks.py", line 576, in ensure_future
  File "/usr/local/lib/python3.7/asyncio/events.py", line 644, in get_event_loop
RuntimeError: There is no current event loop in thread 'MainThread'.
sys:1: RuntimeWarning: coroutine 'ClientSession.__aexit__' was never awaited
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f49982c2e10>

Upvotes: 29

Views: 11311

Answers (6)

stman
stman

Reputation: 355

On exit, all references to event event loops may cease to exist. This answer deals with such case by creating a temporary one

import asyncio
import aiohttp

class MyAPI:
    def __init__(self):
        self.session = aiohttp.ClientSession()

    def __del__(self):
        if not self.session.closed:
            try:
                loop = asyncio.get_running_loop()
            except RuntimeError:
                loop = None

            if loop and loop.is_running():
                loop.create_task(self.session.close())
            else:
                asyncio.run(self.session.close())

async def main():
    api = MyAPI()

asyncio.run(main())

Upvotes: 0

Tarun M
Tarun M

Reputation: 31

The best way to solve this issue is to explicitly close the session using await self.client.close() Any other workaround is not worth it, the main problem being using async with destructor is not possible and creating a task for closing the session can itself end up executing even before the actual task.

Upvotes: 0

guslen
guslen

Reputation: 11

Thank you, @alan. I used your example and added some typing to it. I was working with pyppeteer inside of a class. Not 100% sure if it's correct but at least no exception is raised anymore about a running loop and it is executed as part of the __del__. I'm using this now as a wrapper function to turn my async code to synchronous one. It's a little ugly but it works. I can now safely close the browser instance when the object is destroyed.

My Typed Example

from asyncio import get_event_loop
from typing import TypeVar, Callable, Coroutine, Any

ReturnType = TypeVar("ReturnType")


def async_to_sync(callable_function: Callable[[], Coroutine[Any, Any, ReturnType]]) -> ReturnType:
    loop = get_event_loop()
    if loop.is_running():
        return loop.create_task(callable_function())
    else:
        return loop.run_until_complete(callable_function())

Upvotes: 1

systemime
systemime

Reputation: 59

I implemented a way to share session when writing Django programs (using asgi).Use pid to mark the session of different processes, which is convenient for django to call between different processes.

After actual testing, I can directly call the shared session.

  • Django 3.2
  • uvicorn

aiohttp.py

import os
import asyncio
import aiohttp
import logging

session_list = {}
logger = logging.getLogger(__name__)


class Req:

    @property
    def set_session(self):
        try:
            loop = asyncio.get_running_loop()
        except:
            loop = asyncio.get_event_loop()
            asyncio.set_event_loop(loop)
        session = aiohttp.ClientSession(loop=loop)
        session_list.update({os.getpid(): session})
        return session

    def __init__(self):
        if session_list.get(os.getpid()):
            self.session = session_list.get(os.getpid())
        else:
            self.session = self.set_session

    async def test(self):
        if session_list:
            session = session_list.get(os.getpid())
            if session and session.closed:
                session_list.pop(os.getpid())
                session = self.set_session
        else:
            session = self.set_session

        if not session or session.loop.is_running():
            session = self.set_session
            logger.warning("session abnormal")
        result = await session.get("http://httpbing.org/get")
        print(result.status)


req = Req()

views.py

from django.http import HttpResponse
from django.shortcuts import render  # noqa
from django.views.generic import View
from django.utils.decorators import classonlymethod

import asyncio


class TTT(View):

    @classonlymethod
    def as_view(cls, **initkwargs):
        view = super().as_view(**initkwargs)
        view._is_coroutine = asyncio.coroutines._is_coroutine
        return view

    async def get(self, request):
        await req.test()
        return HttpResponse("ok")

Upvotes: -1

alan
alan

Reputation: 3534

As @Martijn Pieters said, you can't force the event loop to wait for an object's __del__ destructor call. However, you can still use the __del__ destructor to close asynchronous resources by first checking if the loop is running and starting a new loop if it's not. For example, the asyncio Redis module uses this technique when destructing its Client class. For your code, specifically, the the destructor would be as follows:

import asyncio
import aiohttp


class MyAPI:

    def __init__(self):
        self.session = aiohttp.ClientSession()

    def __del__(self):
        # Close connection when this object is destroyed
        try:
            loop = asyncio.get_event_loop()
            if loop.is_running():
                loop.create_task(self.session.close())
            else:
                loop.run_until_complete(self.session.close())
        except Exception:
            pass

Upvotes: 16

Martijn Pieters
Martijn Pieters

Reputation: 1124238

Don't use a __del__ hook to clean up asynchronous resources. You can't count it being called at all, let alone control when it'll be used or if the async loop is still available at that time. You really want to handle this explicitly.

Either make the API an async context manager, or otherwise explicitly clean up resources at exit, with a finally handler, say; the with and async with statements are basically designed to encapsulate resource cleanup traditionally handled in finally blocks.

I'd make the API instance a context manager here:

class MyAPI:
    def __init__(self):
        self.session = aiohttp.ClientSession()

    async def __aenter__(self):
        return self

    async def __aexit__(self, *excinfo):
        await self.session.close()

Note that all that ClientSession.__aexit__() really does is await on self.close(), so the above goes straight to that coroutine.

Then use this in your main loop with:

async def main():
    async with MyAPI() as api:
        pass

Another option is to supply your own session object to the MyAPI instance and take responsibility yourself for closing it when you are done:

class MyAPI:
    def __init__(self, session):
        self.session = session

async def main():
    session = aiohttp.ClientSession()
    try:
        api = MyAPI(session)
        # do things with the API
    finally:
        await session.close()

Upvotes: 38

Related Questions