tamerlangasaev
tamerlangasaev

Reputation: 11

How create class Python 3.x (singleton) for asyncpg?

I wanted to organize a connection pool when initializing the class with the method below

import asyncio
import asyncpg


class DBCommands:

    def __init__(self, uri: str) -> None:
        loop = asyncio.get_event_loop()
    self.pool: asyncpg.pool.Pool = loop.run_until_complete(asyncpg.create_pool(dsn=uri))

    async def get_id_admins(self) -> list:
        async with self.pool.acquire():
            result = await self.pool.fetch("SELECT chat_id FROM users WHERE role_user = 'admin'")
        admins_id = [row[0] for row in result]
        return admins_id

Since the pool should be one, with the above implementation, this will not work. I decided to use singleton, but I don’t understand how to implement this. Below is the version that I came up with. Tell me how best to solve this problem. In addition, I do not understand how best and where to close connections. I'm new to using patterns and just starting to study OOP.

import asyncio
import asyncpg

class Singleton(type):
    _instances = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
        return cls._instances[cls]


class DBManager(metaclass=Singleton):

    @classmethod
    def connect(cls, uri):
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(asyncpg.create_pool(dsn=uri))


class DBCommands:

    def __init__(self, uri) -> None:
        self.uri = uri
        self.pool = DBManager.connect(uri)

    async def get_id_admins(self) -> list:
        async with self.pool.acquire():
            result = await self.pool.fetch("SELECT chat_id FROM users WHERE role_user = 'admin'")
        admins_id = [row[0] for row in result]
        return admins_id

I have an assumption that opening and closing a pool can be added to __aenter__ and __aexit__

Upvotes: 1

Views: 2884

Answers (1)

Ionut Ticus
Ionut Ticus

Reputation: 2789

You can use a class attribute and create the pool the first time it's needed in an async function:

class Database:

    self.pool = None

    ...

    async def get_id_admins(self)
        if self.pool is None:
            self.pool = await asyncpg.create_pool(dsn=...`).

I generally use a regular class and create a single instance attached to global object (like the aiohttp application for web applications) as in:

class Database:

    def __init__(self, dsn):
        self.dsn = dsn
        self.pool = None

    async def connect(self):
        """Initialize asyncpg Pool"""
        self.pool = await asyncpg.create_pool(dsn=self.dsn, min_size=2, max_size=4)
        logging.info("successfully initialized database pool")

    async def get_id_admins(self):

    ...

And use it like:

async def startup(app):
    await app.database.connect()

async def shutdown(app):
    await app.database.pool.close()

def main():
    app = web.Application()
    app.database = Database(app.config.DSN)
    app.on_startup.append(startup)
    app.on_shutdown.append(shutdown)

Upvotes: 2

Related Questions