在 postgres 中,我尝试将大量数据从一个表插入到另一个表中 - 大约 3000 万条记录。完成整个过程大约需要 40 分钟,因此我考虑将插入分成一系列 30 个较小的插入,并使用 asyncpg 这样的库异步执行它们。比如说,每个 INSERT INTO...SELECT 需要 1.5 分钟才能插入大约 100 万条记录,但是当使用 asyncpg 时,我没有看到任何异步行为。看起来插入似乎是一个接一个地连续发生的。稍微简化的示例代码如下。任何人都可以发现我做错了什么或建议替代方法。有可能做到这一点吗?预先感谢您的帮助。
import asyncpg
import asyncio
async def test():
conn = await asyncpg.connect(user='myuser', password='mypassword',
database='mydb', host='myhost')
values = await conn.fetch('''select count(*) from t1''')
rows=values[0][0]
await conn.close()
# get a list of 30 non-overlapping start/end rows for the SELECTS
lst=range(rows)
nums=(lst[i+1:i + int(len(lst)/30)] for i in range(0, len(lst), int(len(lst)/30)))
pool = await asyncpg.create_pool(user='myuser', password='mypassword',
database='mydb', host='myhost')
# The idea is that the below for loop should fire off
# around 30 INSERT... SELECTS asynchronously
for num in nums:
start=num[0]
end=num[len(num)-1]+1
sqlstr=f"""
insert into t2
select
data
from
(
select data , row_number() over() rn
from t1
) as foo where rn between {start} and {end}
"""
async with pool.acquire() as con:
await con.execute(sqlstr)
await pool.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(test())
(我知道这已经晚了,但有人可能会发现它有用。)
当您使用“await”调用异步函数时,asyncio 正是这样做的:它会等待直到函数返回。在您的代码中,对于每个异步插入语句,您都在开始下一个插入之前“等待”它 - 因此它们按顺序运行而不是并行运行,并且您无法从异步中获得任何好处。
你想做的是这样的:
async with pool.acquire() as con:
tasks = []
for num in nums:
start=num[0]
end=num[len(num)-1]+1
sqlstr=f"""
insert into t2
select
data
from
(
select data , row_number() over() rn
from t1
) as foo where rn between {start} and {end}
"""
tasks.append(asyncio.create_task(
con.execute(sqlstr))
)
await asyncio.gather(*tasks)
await pool.close()
注意 con.execute(sqlstr) 被调用为 withoutawait 上面。这意味着当您的方法继续执行时,它将开始自行执行。但是,我们将调用包装在 create_task() 中,这样我们就可以跟踪它。
最后,我们使用 asyncio.gather() 将所有 30 个任务合并为一个大任务,只有当它的 30 个子任务全部返回时,该任务才会返回;我们等待重大任务,确保在整个工作完成之前我们不会继续编写代码。