如何包装协程.__await__()?

问题描述 投票:0回答:1

我想将每个获取/释放查询记录到 asyncpg 池中。为此我写了以下内容

class CntPoolLogger:
    def __init__(self) -> None:
        self.conn_cnt = 0
        self.event_cnt = 0
        self.log_filename = "pool_usage.log"
        self.file = open(self.log_filename, "a")
    
    def log(self, event: str) -> None:
        self.file.write(f"{self.event_cnt},{event},{self.conn_cnt}\n")
        self.file.flush()

    def acquire(self) -> None:
        self.event_cnt += 1
        self.conn_cnt += 1
        self.log("acquire")

    def release(self) -> None:
        self.event_cnt += 1
        self.conn_cnt -= 1
        self.log("release")
    
    def __del__(self):
        self.file.close()


class LoggingPoolAcquireContext:
    def __init__(self, pool_acquire_context: asyncpg.pool.PoolAcquireContext, cnt_logger: CntPoolLogger):
        self.pool_acquire_context = pool_acquire_context
        self.cnt_logger = cnt_logger

    async def __aenter__(self, *args, **kwargs):
        res = await self.pool_acquire_context.__aenter__(*args, **kwargs)
        self.cnt_logger.acquire()
        return res

    async def __aexit__(self, *args, **kwargs):
        await self.pool_acquire_context.__aexit__(*args, **kwargs)
        self.cnt_logger.release()
    
    def awaitable_wraper(self, awaitable):
        ??????

    def __await__(self, *args, **kwargs):
        self.cnt_logger.acquire()
        return self.pool_acquire_context.__await__(*args, **kwargs)
        
        

class LoggingAsyncPGPool:
    def __init__(self, pool, cnt_logger: CntPoolLogger):
        self._pool = pool
        self.cnt_logger = cnt_logger

    def acquire(self, *args, **kwargs):
        res = LoggingPoolAcquireContext(self._pool.acquire(*args, **kwargs), cnt_logger=self.cnt_logger)
        return res

    async def release(self, *args, **kwargs):
        await self._pool.release(*args, **kwargs)
        self.cnt_logger.release()
    
    async def close(self, *args, **kwargs):
        await self._pool.close()

我希望仅在实际获取/释放连接后更新计数器。因此,我仅在“等待”语句之后更新计数器。然而,在这个方法中

def __await__(self, *args, **kwargs):
        self.cnt_logger.acquire()
        return self.pool_acquire_context.__await__(*args, **kwargs)

我们返回一些应该在其他地方等待的可等待的东西。因此,我在不知道是否实际获取连接的情况下更新计数器。

我的问题是:有没有办法以某种方式包装

self.pool_acquire_context.__await__(*args, **kwargs)

那么当我们在未来某个地方等待它时,它仅在获取连接后才更新计数器?

python postgresql asynchronous python-asyncio asyncpg
1个回答
0
投票

您可以围绕 self.pool_acquire_context 创建一个自定义可等待包装器。await(*args, **kwargs) 仅当等待的操作成功完成时才更新计数器。

class LoggingPoolAcquireContext:
    # Existing code...

    async def __aexit__(self, *args, **kwargs):
        try:
            await self.pool_acquire_context.__aexit__(*args, **kwargs)
        finally:
            self.cnt_logger.release()

    async def __await__(self, *args, **kwargs):
        await self.cnt_logger.acquire()
        try:
            result = await self.pool_acquire_context.__await__(*args, **kwargs)
        except Exception as e:
            self.cnt_logger.release()
            raise e
        else:
            return result
© www.soinside.com 2019 - 2024. All rights reserved.