Asyncpg.pool 创建的连接数超过其 max_size

问题描述 投票:0回答:1
  • asyncpg版本:0.29.0
  • PostgreSQL 版本:“aarch64-unknown-linux-gnu 上的 PostgreSQL 16.0 (Debian 16.0-1.pgdg120+1),由 gcc (Debian 12.2.0-14) 12.2.0 编译,64 位”
  • 您使用 PostgreSQL SaaS 吗?如果有,是哪一个?你能重现吗 本地 PostgreSQL 安装的问题?:不,使用 Docker 化容器
  • Python版本:3.11.5
  • 平台:MacBook-Air Darwin 内核版本 21.1.0:2021 年 10 月 13 日星期三 17:33:24 PDT;根:xnu-8019.41.5~1/RELEASE_ARM64_T8101arm64
  • 你使用pgbouncer吗?:没有
  • 你是否用pip安装了asyncpg?:是
  • 如果您在本地构建 asyncpg,您使用哪个版本的 Cython?:n/a
  • 问题可以在 asyncio 和 asyncio 下重现吗? uvloop:不

我有一个数据库实体的单例,然后用它来写入一些原始数据。当我使用

asyncio.gather()
同时运行写入函数 1000 次时,数据库报告连接数多于
max_size
asyncpg.pool
。例如,测试时有 857 个活动数据库连接,但只有 62 个活动池连接。测试期间没有其他客户端/操作正在运行。当我使用 uvloop 做同样的事情时,如果我尝试运行比池大小更多的任务,它就会崩溃并显示
ConnectionResetError: [Errno 54] Connection reset by peer

这是正常的泳池行为吗?

我使用下面的代码(虽然写入功能已简化):

数据库代码:

  class Database:
      _instance = None
      _pool = None
      db_params = { 
                  'host': os.getenv('DATABASE_HOST'),
                  'port': os.getenv('DATABASE_PORT'),
                  'database': os.getenv('DATABASE_NAME'),
                  'user': os.getenv('DATABASE_USER'),
                  'password': os.getenv('DATABASE_PASSWORD')
              }
  
      def __new__(cls, *args, **kwargs):
          if cls._instance is None:
              cls._instance = super(Database, cls).__new__(cls)
          return cls._instance
  
      @classmethod
      async def get_pool(cls):
          if cls._pool is None:
              cls._pool = await asyncpg.create_pool(**cls.db_params, min_size=1, max_size=150)
          return cls._pool

    @classmethod
    async def write(cls, result):
            pool = await cls.get_pool()
            try:
                    async with pool.acquire() as connection:
                        result = await connection.execute('''
                            INSERT INTO tables.results(
                                result
                            ) VALUES($1)
                        ''', result)
                        return
            except Exception as e:
                raise e

演示编写代码

async def fake_result(i):
    print(f'generating fake result {i}')
    await db.record_result(i)
    return

async def run_functions_concurrently():
   tasks = [fake_result(i) for i in range(1000)]
   await asyncio.gather(*tasks)

def main():
    asyncio.run(run_functions_concurrently())

if __name__ == "__main__":
    main()
python pool asyncpg
1个回答
0
投票

当为任务列表调用

asyncio.gather
时,任务会并发运行,并为每个任务创建一个新的连接池。您会注意到,当我们在
get_pool
方法中添加打印语句来调试此问题时,会为每个运行的任务打印文本“此连接池从未重用”。

      @classmethod
      async def get_pool(cls):
          if cls._pool is None:
              print("This connection pool is never reused.")
              cls._pool = await asyncpg.create_pool(
                  **cls.db_params, min_size=1, max_size=150)
          return cls._pool

我们可以通过在调度任务之前创建池来确保池的重用。

async def run_functions_concurrently():
   await db.get_pool()
   tasks = [fake_result(i) for i in range(1000)]
   await asyncio.gather(*tasks)

现在,我们遇到了另一个问题。每个任务运行都会建立与数据库的连接以执行插入。这个过程的性能不是很好,我们有几个选项来安排我们如何编写结果:

  1. 批量收集任务然后执行。

  2. 将每个任务结果入队并运行循环来处理队列中的结果,然后将它们写入数据库。

让我们看一下批处理的实现,

async def run_batches(items, batch_size):
    await db.get_pool()

    # Split the items into batches
    batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]

    # Run batches concurrently
    for batch in batches:
        await run_functions_concurrently(batch)

def main():
    batch_size = 3
    asyncio.run(
        run_batches(
            [fake_result(i) for i in range(1000)], 
            batch_size)
    )

我们可以使用后一种方法来增强这一点,其中涉及利用工作人员和队列。这样做的好处是可以帮助我们避免过多的连接池。

async def run_workers(jobs, num_workers):
    queue = asyncio.Queue()
    # Number of connection pools created is the value of num_workers
    workers = [asyncio.create_task(worker(queue)) for _ in range(num_workers)]

    for job in jobs:
        await queue.put(job)

    # Send signal to quit workers
    for _ in range(num_workers):
        await queue.put(None)

    # Process all enqueued tasks
    await queue.join()

    # Cancel worker tasks
    for worker_task in workers:
        worker_task.cancel()

    # Wait for worker tasks to complete
    await asyncio.gather(*workers, return_exceptions=True)


def main():
    asyncio.run(run_workers([fake_result(i) for i in range(1000)], 3))
© www.soinside.com 2019 - 2024. All rights reserved.