我想将每个获取/释放查询记录到 asyncpg 池中。为此我写了以下内容
class CntPoolLogger:
def __init__(self) -> None:
self.conn_cnt = 0
self.event_cnt = 0
self.log_filename = "pool_usage.log"
self.file = open(self.log_filename, "a")
def log(self, event: str) -> None:
self.file.write(f"{self.event_cnt},{event},{self.conn_cnt}\n")
self.file.flush()
def acquire(self) -> None:
self.event_cnt += 1
self.conn_cnt += 1
self.log("acquire")
def release(self) -> None:
self.event_cnt += 1
self.conn_cnt -= 1
self.log("release")
def __del__(self):
self.file.close()
class LoggingPoolAcquireContext:
def __init__(self, pool_acquire_context: asyncpg.pool.PoolAcquireContext, cnt_logger: CntPoolLogger):
self.pool_acquire_context = pool_acquire_context
self.cnt_logger = cnt_logger
async def __aenter__(self, *args, **kwargs):
res = await self.pool_acquire_context.__aenter__(*args, **kwargs)
self.cnt_logger.acquire()
return res
async def __aexit__(self, *args, **kwargs):
await self.pool_acquire_context.__aexit__(*args, **kwargs)
self.cnt_logger.release()
def awaitable_wraper(self, awaitable):
??????
def __await__(self, *args, **kwargs):
self.cnt_logger.acquire()
return self.pool_acquire_context.__await__(*args, **kwargs)
class LoggingAsyncPGPool:
def __init__(self, pool, cnt_logger: CntPoolLogger):
self._pool = pool
self.cnt_logger = cnt_logger
def acquire(self, *args, **kwargs):
res = LoggingPoolAcquireContext(self._pool.acquire(*args, **kwargs), cnt_logger=self.cnt_logger)
return res
async def release(self, *args, **kwargs):
await self._pool.release(*args, **kwargs)
self.cnt_logger.release()
async def close(self, *args, **kwargs):
await self._pool.close()
我希望仅在实际获取/释放连接后更新计数器。因此,我仅在“等待”语句之后更新计数器。然而,在这个方法中
def __await__(self, *args, **kwargs):
self.cnt_logger.acquire()
return self.pool_acquire_context.__await__(*args, **kwargs)
我们返回一些应该在其他地方等待的可等待的东西。因此,我在不知道是否实际获取连接的情况下更新计数器。
我的问题是:有没有办法以某种方式包装
self.pool_acquire_context.__await__(*args, **kwargs)
那么当我们在未来某个地方等待它时,它仅在获取连接后才更新计数器?
您可以围绕 self.pool_acquire_context 创建一个自定义可等待包装器。await(*args, **kwargs) 仅当等待的操作成功完成时才更新计数器。
class LoggingPoolAcquireContext:
# Existing code...
async def __aexit__(self, *args, **kwargs):
try:
await self.pool_acquire_context.__aexit__(*args, **kwargs)
finally:
self.cnt_logger.release()
async def __await__(self, *args, **kwargs):
await self.cnt_logger.acquire()
try:
result = await self.pool_acquire_context.__await__(*args, **kwargs)
except Exception as e:
self.cnt_logger.release()
raise e
else:
return result