我有一个数据库实体的单例,然后用它来写入一些原始数据。当我使用
asyncio.gather()
同时运行写入函数 1000 次时,数据库报告连接数多于 max_size
的 asyncpg.pool
。例如,测试时有 857 个活动数据库连接,但只有 62 个活动池连接。测试期间没有其他客户端/操作正在运行。当我使用 uvloop 做同样的事情时,如果我尝试运行比池大小更多的任务,它就会崩溃并显示 ConnectionResetError: [Errno 54] Connection reset by peer
。
这是正常的泳池行为吗?
我使用下面的代码(虽然写入功能已简化):
数据库代码:
class Database:
_instance = None
_pool = None
db_params = {
'host': os.getenv('DATABASE_HOST'),
'port': os.getenv('DATABASE_PORT'),
'database': os.getenv('DATABASE_NAME'),
'user': os.getenv('DATABASE_USER'),
'password': os.getenv('DATABASE_PASSWORD')
}
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(Database, cls).__new__(cls)
return cls._instance
@classmethod
async def get_pool(cls):
if cls._pool is None:
cls._pool = await asyncpg.create_pool(**cls.db_params, min_size=1, max_size=150)
return cls._pool
@classmethod
async def write(cls, result):
pool = await cls.get_pool()
try:
async with pool.acquire() as connection:
result = await connection.execute('''
INSERT INTO tables.results(
result
) VALUES($1)
''', result)
return
except Exception as e:
raise e
演示编写代码
async def fake_result(i):
print(f'generating fake result {i}')
await db.record_result(i)
return
async def run_functions_concurrently():
tasks = [fake_result(i) for i in range(1000)]
await asyncio.gather(*tasks)
def main():
asyncio.run(run_functions_concurrently())
if __name__ == "__main__":
main()
当为任务列表调用
asyncio.gather
时,任务会并发运行,并为每个任务创建一个新的连接池。您会注意到,当我们在 get_pool
方法中添加打印语句来调试此问题时,会为每个运行的任务打印文本“此连接池从未重用”。
@classmethod
async def get_pool(cls):
if cls._pool is None:
print("This connection pool is never reused.")
cls._pool = await asyncpg.create_pool(
**cls.db_params, min_size=1, max_size=150)
return cls._pool
我们可以通过在调度任务之前创建池来确保池的重用。
async def run_functions_concurrently():
await db.get_pool()
tasks = [fake_result(i) for i in range(1000)]
await asyncio.gather(*tasks)
现在,我们遇到了另一个问题。每个任务运行都会建立与数据库的连接以执行插入。这个过程的性能不是很好,我们有几个选项来安排我们如何编写结果:
批量收集任务然后执行。
将每个任务结果入队并运行循环来处理队列中的结果,然后将它们写入数据库。
让我们看一下批处理的实现,
async def run_batches(items, batch_size):
await db.get_pool()
# Split the items into batches
batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
# Run batches concurrently
for batch in batches:
await run_functions_concurrently(batch)
def main():
batch_size = 3
asyncio.run(
run_batches(
[fake_result(i) for i in range(1000)],
batch_size)
)
我们可以使用后一种方法来增强这一点,其中涉及利用工作人员和队列。这样做的好处是可以帮助我们避免过多的连接池。
async def run_workers(jobs, num_workers):
queue = asyncio.Queue()
# Number of connection pools created is the value of num_workers
workers = [asyncio.create_task(worker(queue)) for _ in range(num_workers)]
for job in jobs:
await queue.put(job)
# Send signal to quit workers
for _ in range(num_workers):
await queue.put(None)
# Process all enqueued tasks
await queue.join()
# Cancel worker tasks
for worker_task in workers:
worker_task.cancel()
# Wait for worker tasks to complete
await asyncio.gather(*workers, return_exceptions=True)
def main():
asyncio.run(run_workers([fake_result(i) for i in range(1000)], 3))