postgres 数据库中存在大量插入的 asyncpg 问题

问题描述 投票:0回答:1

在 postgres 中,我尝试将大量数据从一个表插入到另一个表中 - 大约 3000 万条记录。完成整个过程大约需要 40 分钟,因此我考虑将插入分成一系列 30 个较小的插入,并使用 asyncpg 这样的库异步执行它们。比如说,每个 INSERT INTO...SELECT 需要 1.5 分钟才能插入大约 100 万条记录,但是当使用 asyncpg 时,我没有看到任何异步行为。看起来插入似乎是一个接一个地连续发生的。稍微简化的示例代码如下。任何人都可以发现我做错了什么或建议替代方法。有可能做到这一点吗?预先感谢您的帮助。

import asyncpg
import asyncio

async def test():
    
    conn = await asyncpg.connect(user='myuser', password='mypassword',
                                 database='mydb', host='myhost')
    values = await conn.fetch('''select count(*) from t1''')
    rows=values[0][0]
    await conn.close()

# get a list of 30 non-overlapping start/end rows for the SELECTS
    lst=range(rows)
    nums=(lst[i+1:i + int(len(lst)/30)] for i in range(0, len(lst), int(len(lst)/30)))

    pool =   await asyncpg.create_pool(user='myuser', password='mypassword',
                                 database='mydb', host='myhost')

# The idea is that the below for loop should fire off 
# around 30 INSERT... SELECTS  asynchronously
    for num in nums:
        start=num[0]
        end=num[len(num)-1]+1

        sqlstr=f"""
            insert  into t2
            select 
                   data
            from
            (
                select data , row_number() over() rn
                from t1
            ) as foo where rn between {start} and {end}
        """

        async with pool.acquire() as con:
        await con.execute(sqlstr)

    await pool.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(test())
python postgresql asynchronous asyncpg
1个回答
0
投票

(我知道这已经晚了,但有人可能会发现它有用。)

当您使用“await”调用异步函数时,asyncio 正是这样做的:它会等待直到函数返回。在您的代码中,对于每个异步插入语句,您都在开始下一个插入之前“等待”它 - 因此它们按顺序运行而不是并行运行,并且您无法从异步中获得任何好处。

你想做的是这样的:

async with pool.acquire() as con:
    tasks = []
    for num in nums:
        start=num[0]
        end=num[len(num)-1]+1

        sqlstr=f"""
        insert  into t2
        select 
            data
        from
        (
            select data , row_number() over() rn
            from t1
        ) as foo where rn between {start} and {end}
        """

        tasks.append(asyncio.create_task(
            con.execute(sqlstr))
        )

await asyncio.gather(*tasks)
await pool.close()

注意 con.execute(sqlstr) 被调用为 withoutawait 上面。这意味着当您的方法继续执行时,它将开始自行执行。但是,我们将调用包装在 create_task() 中,这样我们就可以跟踪它。

最后,我们使用 asyncio.gather() 将所有 30 个任务合并为一个大任务,只有当它的 30 个子任务全部返回时,该任务才会返回;我们等待重大任务,确保在整个工作完成之前我们不会继续编写代码。

© www.soinside.com 2019 - 2024. All rights reserved.