python aiomysql 查询时间随着并行任务数量的增加而增加

问题描述 投票:0回答:1

我正在使用 python 的 aiomysql 在异步应用程序中查询 MySQL 数据库。我还使用 anyio 与 create_task_group 运行并行任务。有时我需要从mysql数据库中分块获取数据,为了加快查询速度,我创建任务组并并行运行查询,但我遇到了单次查询时间随着任务数量而增加的问题。有一个片段可以提供示例:

import os
import aiomysql
from time import time
from anyio import run, create_task_group
from dotenv import load_dotenv

diffs = []  # contains each single query time
tot_reqs = 0  # count number of total requests


async def measure(pool):
    global diffs
    global tot_reqs

    async with pool.acquire() as conn:
        start_time = time()
        while True:
            t1 = time()
            cur = await conn.cursor()
            sql = "select * from ttable limit 10000"
            await cur.execute(query=sql)
            await cur.close()

            diffs.append(time() - t1)
            tot_reqs += 1
            if len(diffs) > 9:  # print mean time of every 10 queries
                print(
                    f"[{time()-start_time:.2f}s since started] Mean query time: {sum(diffs)/len(diffs)} (total requests performed: {tot_reqs})"
                )
                diffs = []


async def case():
    db_env_prefix = "SOME_MYSQL"
    pool = await aiomysql.create_pool(
        host=os.getenv(db_env_prefix + "_HOST"),
        port=int(os.getenv(db_env_prefix + "_PORT") or 3306),
        user=os.getenv(db_env_prefix + "_USERNAME"),
        password=os.getenv(db_env_prefix + "_PASSWORD"),
        db=os.getenv(db_env_prefix + "_DB"),
        maxsize=10,
        autocommit=True,
        pool_recycle=600,
    )
    async with create_task_group() as tg:
        for _ in range(10):
            tg.start_soon(measure, pool)
    pool.close()
    await pool.wait_closed()


if __name__ == "__main__":
    load_dotenv()
    run(case)

打印输出是:

[0.83s since started] Mean query time: 0.5825932741165161 (total requests performed: 10)
[2.95s since started] Mean query time: 0.989139986038208 (total requests performed: 20)
[4.23s since started] Mean query time: 1.238171124458313 (total requests performed: 30)
[5.53s since started] Mean query time: 1.2967675924301147 (total requests performed: 40)
[6.83s since started] Mean query time: 1.2985524892807008 (total requests performed: 50)
[8.14s since started] Mean query time: 1.3030725479125977 (total requests performed: 60)
[9.44s since started] Mean query time: 1.3051365852355956 (total requests performed: 70)
[10.75s since started] Mean query time: 1.3047129154205321 (total requests performed: 80)
[12.05s since started] Mean query time: 1.3064133167266845 (total requests performed: 90)
[13.36s since started] Mean query time: 1.3030510425567627 (total requests performed: 100)

因此,通过在大约 13 秒内并行运行 10 个任务,我已经完成了大约 100 个请求,平均查询时间为 1.3 秒。然后我只运行一项任务:

async with create_task_group() as tg:
        for _ in range(1):
            tg.start_soon(measure, pool)

我得到了

[1.24s since started] Mean query time: 0.12407510280609131 (total requests performed: 10)
[2.54s since started] Mean query time: 0.13026781082153321 (total requests performed: 20)
[3.85s since started] Mean query time: 0.13041894435882567 (total requests performed: 30)
[5.16s since started] Mean query time: 0.13082268238067626 (total requests performed: 40)
[6.47s since started] Mean query time: 0.13111093044281005 (total requests performed: 50)
[7.78s since started] Mean query time: 0.13118529319763184 (total requests performed: 60)
[9.09s since started] Mean query time: 0.1312186002731323 (total requests performed: 70)
[10.40s since started] Mean query time: 0.13101680278778077 (total requests performed: 80)
[11.72s since started] Mean query time: 0.13155796527862548 (total requests performed: 90)
[13.03s since started] Mean query time: 0.131210994720459 (total requests performed: 100)

在约 13 秒内完成相同的 100 个请求(单个任务的单个查询速度快 10 倍,平均查询时间为 0.13 秒)。 没有任何 cpu 限制的任务,只有 IO 请求。

因此,通过增加任务数量并没有实际改善查询时间:任务越多,单个查询就越慢。我尝试查询不同的表,更改 mysql 配置,例如增加 innodb_thread_concurrency、innodb_read_io_threads 等。实际上,它看起来不像表/数据库配置有问题,因为当我同时在多个终端中运行相同的代码时,单个查询时间不会斗争并按照我对 tg.start_soon 方法的预期进行工作。多处理会有所帮助,但这有其自身的缺点。而且,对于 IO 任务来说,它的行为很奇怪,没有任何东西应该阻止异步请求。

更新: 按照评论中的要求:

MariaDB [(none)]> SELECT 'SLEEPING MDB Ram use', COUNT(*),SUM(time),SUM(memory_used),SUM(max_memory_used) FROM information_schema.processlist WHERE command="Sleep";
+----------------------+----------+-----------+------------------+----------------------+
| SLEEPING MDB Ram use | COUNT(*) | SUM(time) | SUM(memory_used) | SUM(max_memory_used) |
+----------------------+----------+-----------+------------------+----------------------+
| SLEEPING MDB Ram use |        1 |       309 |            79632 |               162472 |
+----------------------+----------+-----------+------------------+----------------------+
python asynchronous aio-mysql anyio
1个回答
0
投票

针对 my.cnf [mysqld] 部分考虑的建议

thread_pool_size=6  # from 8 - suggestion in doc is 80% of your 8 cores max
table_open_cache_instances=1  # from 8 until you have more than 1,000 tables
net_buffer_length=98304  # from 16384 to reduce packet in/out count
innodb_io_capacity=900  # from 200 to user move of your NVME IOPS
tmp_table_size=33554532  # from 16M to expand capacity
max_heap_table_size=33554532  # from 16M to reduce created_tmp_disk_tables

有更多提高绩效的机会。请查看我的个人资料。

希望您的查询能够更快完成。当通过一个连接流式传输一组查询时,您可能无法实现您希望的并行处理。如果时间允许,请在实施后发布新的完成时间。

© www.soinside.com 2019 - 2024. All rights reserved.