尝试使用 SQLAlchemy 异步 pd.to_sql()

问题描述 投票:0回答:2

我正在尝试以尽可能最少的方式将数据帧安全地异步写入我的 PostgreSQL 数据库。我正在努力理解与此相关的文档,并希望这里有人可以提供帮助。

我有以下代码:

from datetime import datetime
import pandas as pd 
import asyncio

x = 0

async def write_sql(engine):
    global x 
    x += 1
    print(f"Running {x}") 
    d = {'col1': [1, 2, 3, 4], 'col2': [3, 4, 5, 6]}
    df = pd.DataFrame(data=d)   
    df.to_sql(con=engine, name="test_data", if_exists="replace", index=False)
    await asyncio.sleep(1)


async def main():    
    engine = create_engine('postgresql+asyncpg://user:pass@localhost:5432/postgres')
    t1 = datetime.now() 
    await asyncio.gather(write_sql(engine),write_sql(engine),write_sql(engine))
    t2 = datetime.now()
    delta = t2 - t1
    print(f"Took {delta} seconds")


if __name__=='__main__': 
    asyncio.run(main()) 
    print("Finished.")

这有效,但是由于我没有使用 create_async_engine() 我担心这可能不是解决这个问题的最佳方法。更改代码以使用 create_async_engine() 相反会引发“AsyncEngine 对象没有光标”错误。

这里有人可以解释一下吗?

python pandas sqlalchemy python-asyncio
2个回答
0
投票

您的代码根本没有从异步中获得任何好处:唯一会执行大量 I/O 并因此能够释放异步循环以执行其他任务的调用是

df.to_sql
- 而且它是同步的并且处于开启状态同一个线程。额外的“await asyncio.sleep(1)`只会让你的函数在写入完成后额外挂起一秒。(如果你需要一些冷的安慰,其他异步任务可以在这1秒上运行)。

如果您没有更改任何其他内容,并且如果您有其他一些实际上可以工作的异步代码,则可以使用异步

loop.run_in_executor
调用:这将使同步
to_sql
方法在另一个线程中运行,并且主线程线程被释放以用于其他任务。

from functools import partial
...
async def write_sql(engine):
    global x 
    x += 1
    print(f"Running {x}") 
    d = {'col1': [1, 2, 3, 4], 'col2': [3, 4, 5, 6]}
    df = pd.DataFrame(data=d)   
    task_code = partial(df.to_sql,  name="test_data", if_exists="replace", index=False)
    loop = asynco.get_running_loop()
    await loop.run_in_executor(None, task_code)
    # drops the useless asyncio.sleep

但是,正如您所说,您希望在多线程代码中无法保证“安全执行”:如果您有并发代码 - 无论是在其他异步任务中还是在写入时修改目标数据帧的线程中,这还不够.


0
投票

我也对这个问题的明确答案感兴趣,但我很难。尽管如此,我的解决方案只是在运行

multiprocessing.Queue()
之前创建另一个线程补充
asyncio.run()
,然后使用
queue.put(item)
从事件循环内将批量数据发送到该队列。与此同时,工作线程根据输入值运行
df.to_sql()

@jsbueno 的答案也是一个选项,但是当你必须执行多次插入时,我发现它有问题,因为它启动了尽可能多的线程。

© www.soinside.com 2019 - 2024. All rights reserved.