我有一些 parquet
文件,这些文件都来自同一个域,但结构上有些不同。我需要将它们全部连接起来。下面是这些文件的一些例子。
file 1:
A,B
True,False
False,False
file 2:
A,C
True,False
False,True
True,True
我想做的是以最快的方式读取和连接这些文件,得到以下结果。
A,B,C
True,False,NaN
False,False,NaN
True,NaN,False
False,NaN,True
True,NaN,True
为了达到这个目的,我使用了下面的代码,是用(使用Dask读取多个文件, Dask数据框:读取多个文件&将文件名存储在列中。):
import glob
import dask.dataframe as dd
from dask.distributed import Client
import dask
def read_parquet(path):
return pd.read_parquet(path)
if __name__=='__main__':
files = glob.glob('test/*/file.parquet')
print('Start dask client...')
client = Client()
results = [dd.from_delayed(dask.delayed(read_parquet)(diag)) for diag in diag_files]
results = dd.concat(results).compute()
client.close()
这段代码可以用,而且已经是我能想到的最快的版本了(我试过顺序的 pandas
和 multiprocessing.Pool
). 我的想法是,Dask 理想情况下可以在读取部分文件的同时开始部分的连通,然而,从任务图中我看到了一些顺序读取每个parquet文件的元数据,见下面的截图。
任务图的第一部分是由以下几个部分混合而成的 read_parquet
其次 read_metadata
. 第一部分总是只显示1个任务的执行情况(在任务处理选项卡中)。第二部分是下列任务的组合 from_delayed
和 concat
而且它使用了我所有的工人。
有什么建议可以加快文件读取速度,减少图的第一部分的执行时间?
你的代码的问题是,你使用了 熊猫 版读取parquet.
而不是使用。
类似的东西。
def read_parquet(path):
return dd.read_parquet(path)
def myRead():
L = client.map(read_parquet, glob.glob('file_*.parquet'))
lst = client.gather(L)
return dd.concat(lst)
result = myRead().compute()
在此之前,我创造了一个 客户原因是在我之前的实验中,当我试图再次创建它(在一个函数中)时,我收到了一个错误信息,尽管第一个实例之前已经被关闭。