我正在尝试以尽可能最少的方式将数据帧安全地异步写入我的 PostgreSQL 数据库。我正在努力理解与此相关的文档,并希望这里有人可以提供帮助。
我有以下代码:
from datetime import datetime
import pandas as pd
import asyncio
x = 0
async def write_sql(engine):
global x
x += 1
print(f"Running {x}")
d = {'col1': [1, 2, 3, 4], 'col2': [3, 4, 5, 6]}
df = pd.DataFrame(data=d)
df.to_sql(con=engine, name="test_data", if_exists="replace", index=False)
await asyncio.sleep(1)
async def main():
engine = create_engine('postgresql+asyncpg://user:pass@localhost:5432/postgres')
t1 = datetime.now()
await asyncio.gather(write_sql(engine),write_sql(engine),write_sql(engine))
t2 = datetime.now()
delta = t2 - t1
print(f"Took {delta} seconds")
if __name__=='__main__':
asyncio.run(main())
print("Finished.")
这有效,但是由于我没有使用 create_async_engine() 我担心这可能不是解决这个问题的最佳方法。更改代码以使用 create_async_engine() 相反会引发“AsyncEngine 对象没有光标”错误。
这里有人可以解释一下吗?
您的代码根本没有从异步中获得任何好处:唯一会执行大量 I/O 并因此能够释放异步循环以执行其他任务的调用是
df.to_sql
- 而且它是同步的并且处于开启状态同一个线程。额外的“await asyncio.sleep(1)`只会让你的函数在写入完成后额外挂起一秒。(如果你需要一些冷的安慰,其他异步任务可以在这1秒上运行)。
如果您没有更改任何其他内容,并且如果您有其他一些实际上可以工作的异步代码,则可以使用异步
loop.run_in_executor
调用:这将使同步to_sql
方法在另一个线程中运行,并且主线程线程被释放以用于其他任务。
from functools import partial
...
async def write_sql(engine):
global x
x += 1
print(f"Running {x}")
d = {'col1': [1, 2, 3, 4], 'col2': [3, 4, 5, 6]}
df = pd.DataFrame(data=d)
task_code = partial(df.to_sql, name="test_data", if_exists="replace", index=False)
loop = asynco.get_running_loop()
await loop.run_in_executor(None, task_code)
# drops the useless asyncio.sleep
但是,正如您所说,您希望在多线程代码中无法保证“安全执行”:如果您有并发代码 - 无论是在其他异步任务中还是在写入时修改目标数据帧的线程中,这还不够.
我也对这个问题的明确答案感兴趣,但我很难。尽管如此,我的解决方案只是在运行
multiprocessing.Queue()
之前创建另一个线程补充 asyncio.run()
,然后使用 queue.put(item)
从事件循环内将批量数据发送到该队列。与此同时,工作线程根据输入值运行df.to_sql()
。
@jsbueno 的答案也是一个选项,但是当你必须执行多次插入时,我发现它有问题,因为它启动了尽可能多的线程。