Python和Dask - 读取和连接多个文件。

问题描述 投票:0回答:1

我有一些 parquet 文件,这些文件都来自同一个域,但结构上有些不同。我需要将它们全部连接起来。下面是这些文件的一些例子。

file 1:
A,B
True,False
False,False

file 2:
A,C
True,False
False,True
True,True

我想做的是以最快的方式读取和连接这些文件,得到以下结果。

A,B,C
True,False,NaN
False,False,NaN
True,NaN,False
False,NaN,True
True,NaN,True

为了达到这个目的,我使用了下面的代码,是用(使用Dask读取多个文件, Dask数据框:读取多个文件&将文件名存储在列中。):

import glob

import dask.dataframe as dd
from dask.distributed import Client
import dask

def read_parquet(path):
    return pd.read_parquet(path)

if __name__=='__main__':

    files = glob.glob('test/*/file.parquet')

    print('Start dask client...')
    client = Client()

    results = [dd.from_delayed(dask.delayed(read_parquet)(diag)) for diag in diag_files]

    results = dd.concat(results).compute()

    client.close()

这段代码可以用,而且已经是我能想到的最快的版本了(我试过顺序的 pandasmultiprocessing.Pool). 我的想法是,Dask 理想情况下可以在读取部分文件的同时开始部分的连通,然而,从任务图中我看到了一些顺序读取每个parquet文件的元数据,见下面的截图。task graph dask

任务图的第一部分是由以下几个部分混合而成的 read_parquet 其次 read_metadata. 第一部分总是只显示1个任务的执行情况(在任务处理选项卡中)。第二部分是下列任务的组合 from_delayedconcat 而且它使用了我所有的工人。

有什么建议可以加快文件读取速度,减少图的第一部分的执行时间?

python dask dask-delayed
1个回答
0
投票

你的代码的问题是,你使用了 熊猫读取parquet.

而不是使用。

  • dask读取parquet,
  • 地图聚集 提供的方法 客户,
  • 掩码吻合,

类似的东西。

def read_parquet(path):
    return dd.read_parquet(path)

def myRead():
    L = client.map(read_parquet, glob.glob('file_*.parquet'))
    lst = client.gather(L)
    return dd.concat(lst)

result = myRead().compute()

在此之前,我创造了一个 客户原因是在我之前的实验中,当我试图再次创建它(在一个函数中)时,我收到了一个错误信息,尽管第一个实例之前已经被关闭。

© www.soinside.com 2019 - 2024. All rights reserved.