我如何通过psycopg2或asyncpg将Postgres连接池传递给Dask工作者?

问题描述 投票:0回答:1

我希望我的Dask工作者从ThreadedConnectionPool抓取Postgres连接,但是当像这样通过池时>

from psycopg2.pool import ThreadedConnectionPool

def worker_pg(n, pool) -> None:
    print(n)

work = db.from_sequence(range(4))
tcp = ThreadedConnectionPool(1, 800, "db_string")

work.map(worker_pg, pool=tcp).compute()

我收到序列化错误,例如:

TypeError: ('Could not serialize object of type ThreadedConnectionPool.', '<psycopg2.pool.ThreadedConnectionPool object at 0x7f99dc57b128>')

而且,当我一直在尝试使用psycopg2进行此操作时,我也真的很想将其与asyncpg一起使用(出于性能原因)。但是,使用await]中的asyncasyncio有更多的麻烦

import asyncio
import asyncpg

async def get_pool():
    p = await asyncpg.create_pool("db_string")
    return p

pool = asyncio.get_event_loop().run_until_complete(get_pool())

work.map(worker_pg, pool=pool).compute()

尽管我似乎确实会遇到相同类型的错误,例如

TypeError: ('Could not serialize object of type Pool.', '<asyncpg.pool.Pool object at 0x7fdee9127818>')

任何建议(或替代方案?)都非常感谢!

我希望我的Dask工作者从ThreadedConnectionPool捕获Postgres连接,但是当像从psycopg2.pool那样传递池时,请导入ThreadedConnectionPool def worker_pg(n,pool)-> ...

python-asyncio psycopg2 dask dask-distributed asyncpg
1个回答
0
投票
如注释中所建议,您可能考虑让每个任务打开与Postgres的连接,执行查询,然后关闭该连接。

很遗憾,Dask无法在计算机之间移动活动的数据库连接。这些对象与启动它们的过程紧密相关。

© www.soinside.com 2019 - 2024. All rights reserved.