我希望我的Dask工作者从ThreadedConnectionPool
抓取Postgres连接,但是当像这样通过池时>
from psycopg2.pool import ThreadedConnectionPool def worker_pg(n, pool) -> None: print(n) work = db.from_sequence(range(4)) tcp = ThreadedConnectionPool(1, 800, "db_string") work.map(worker_pg, pool=tcp).compute()
我收到序列化错误,例如:
TypeError: ('Could not serialize object of type ThreadedConnectionPool.', '<psycopg2.pool.ThreadedConnectionPool object at 0x7f99dc57b128>')
而且,当我一直在尝试使用
psycopg2
进行此操作时,我也真的很想将其与asyncpg
一起使用(出于性能原因)。但是,使用await
]中的async
和asyncio
有更多的麻烦import asyncio import asyncpg async def get_pool(): p = await asyncpg.create_pool("db_string") return p pool = asyncio.get_event_loop().run_until_complete(get_pool()) work.map(worker_pg, pool=pool).compute()
尽管我似乎确实会遇到相同类型的错误,例如
TypeError: ('Could not serialize object of type Pool.', '<asyncpg.pool.Pool object at 0x7fdee9127818>')
任何建议(或替代方案?)都非常感谢!
我希望我的Dask工作者从ThreadedConnectionPool捕获Postgres连接,但是当像从psycopg2.pool那样传递池时,请导入ThreadedConnectionPool def worker_pg(n,pool)-> ...
很遗憾,Dask无法在计算机之间移动活动的数据库连接。这些对象与启动它们的过程紧密相关。