如何在Dask分布式上运行SQLAlchemy查询?

问题描述 投票:0回答:1

我试图使用我建立的dask集群运行和并行化这个sqlalchemy查询,因为我没有足够的内存从我的本地计算机执行它。

我的代码如下--我不确定这是否是实现这一目标的最佳方式。

from dask.distributed import Client
import dask.dataframe as dd
from dask.delayed import delayed
client = Client(<IP Address>)

recent_dates = ['2020-04-24', '2020-04-23', 2020-04-22']

query = """SELECT * FROM table WHERE date = '%s'"""
queries = [query.format(d) for d in recent_dates]

from sqlalchemy.engine import create_engine
conn = create_engine(f'presto://{user}:{password}@{host}:{port}/{catalog}/{schema}',
                           connect_args={'protocol': 'https',
                                         'requests_kwargs': {'verify': key}})

con = engine.connect()
df = dd.from_delayed([delayed(pd.read_sql_query)(q, conn) for q in queries])

我得到了以下错误信息:

TypeError: can't pickle _thread.RLock objects
python sqlalchemy dask dask-distributed dask-dataframe
1个回答
1
投票

你应该使用函数 read_sql_table,就是为了这个目的而制作的。如果你读了docstrings和代码,你会发现是查询本身被传递给了worker,而worker在本地创建了自己的引擎实例。这是因为 sqlalchemy 实例的状态不能在 Worker 之间发送,你已经发现了。

注意 read_sql_table 也关心您的数据分区,因为这是 Dask,整个要点是处理比内存大的数据。在您的例子中,我猜测非分区列是 date而你想通过 "分割 "来明确地分割。

© www.soinside.com 2019 - 2024. All rights reserved.