如何为asyncpg创建类Python 3.x(单例)?

问题描述 投票:1回答:1

我想使用以下方法初始化类时组织一个连接池

import asyncio
import asyncpg


class DBCommands:

    def __init__(self, uri: str) -> None:
        loop = asyncio.get_event_loop()
    self.pool: asyncpg.pool.Pool = loop.run_until_complete(asyncpg.create_pool(dsn=uri))

    async def get_id_admins(self) -> list:
        async with self.pool.acquire():
            result = await self.pool.fetch("SELECT chat_id FROM users WHERE role_user = 'admin'")
        admins_id = [row[0] for row in result]
        return admins_id

由于该池应该是一个池,因此在上述实现中将无法使用。我决定使用单例,但我不知道如何实现。以下是我想到的版本。告诉我如何最好地解决这个问题。另外,我不了解如何最好以及在哪里关闭连接。我是使用模式的新手,刚刚开始研究OOP。

import asyncio
import asyncpg

class Singleton(type):
    _instances = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
        return cls._instances[cls]


class DBManager(metaclass=Singleton):

    @classmethod
    def connect(cls, uri):
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(asyncpg.create_pool(dsn=uri))


class DBCommands:

    def __init__(self, uri) -> None:
        self.uri = uri
        self.pool = DBManager.connect(uri)

    async def get_id_admins(self) -> list:
        async with self.pool.acquire():
            result = await self.pool.fetch("SELECT chat_id FROM users WHERE role_user = 'admin'")
        admins_id = [row[0] for row in result]
        return admins_id

我假设可以将打开和关闭池添加到__aenter____aexit__

python-3.x postgresql singleton python-asyncio asyncpg
1个回答
0
投票
您可以使用class attribute并在异步功能中首次需要该池时创建它:

class Database: self.pool = None ... async def get_id_admins(self) if self.pool is None: self.pool = await asyncpg.create_pool(dsn=...`).

我通常使用常规类并创建附加到全局对象的单个实例(如Web应用程序的aiohttp应用程序,如下所示:

class Database: def __init__(self, dsn): self.dsn = dsn self.pool = None async def connect(self): """Initialize asyncpg Pool""" self.pool = await asyncpg.create_pool(dsn=self.dsn, min_size=2, max_size=4) logging.info("successfully initialized database pool") async def get_id_admins(self): ...

并像这样使用它:

async def startup(app): await app.database.connect() async def shutdown(app): await app.database.pool.close() def main(): app = web.Application() app.database = Database(app.config.DSN) app.on_startup.append(startup) app.on_shutdown.append(shutdown)

© www.soinside.com 2019 - 2024. All rights reserved.