并行读取 pandas 数据框中的多个 csv 文件

问题描述 投票:0回答:1

这个问题是前一个问题的发展:

读取 pandas 数据框中的多个 csv 文件

基本上,我有一组文件,例如:

文件1:

<empty line>
#-----------------------------------------
#    foo  bar  baz  
#-----------------------------------------
    0.0120932       1.10166       1.08745 
    0.0127890       1.10105       1.08773 
    0.0142051       1.09941       1.08760 
    0.0162801       1.09662       1.08548 
    0.0197376       1.09170       1.08015 

文件2:

<empty line>
#-----------------------------------------
#    foo  bar  baz  
#-----------------------------------------
    0.888085      0.768590      0.747961
    0.893782      0.781607      0.760417
    0.899830      0.797021      0.771219
    0.899266      0.799260      0.765859
    0.891489      0.781255      0.728892

等等。每个文件都由一个 ID 来标识,并且有一个 ID 到文件的映射:

files = {'A': 'A.csv', 'B': 'B.csv'}

感谢其他答案,我可以连续读取文件:

columns = ['foo', 'bar', 'baz']
skip = 4
df = (pd.concat({k: pd.read_csv(v, skiprows=skip, sep=r'\s+', names=names)
                 for k,v in files.items()},
                names=['ID'])
        .reset_index('ID')
        .reset_index(drop=True)
    )

但是,我想并行阅读它们,以利用我的多核机器。天真的尝试是行不通的:

from joblib import Parallel, delayed
from multiprocessing import cpu_count

n_jobs = cpu_count()


def read_file(res_dict: dict,
                          skiprows: int,
                          columns: list[str],
                          id: str,
                          file: Path
                          ) -> None:
    res_dict[id] = pd.read_csv(file, skiprows=skiprows, sep=r'\s+', names=columns)


temp = {}
temp = Parallel(n_jobs)(delayed(read_file)(temp, skip_rows, columns, id, file) for id, file in master2file.items())
df = (pd.concat(temp,
                names=['ID'])
      .reset_index('ID')
      .reset_index(drop=True)
      )

我收到错误

Traceback (most recent call last):
  File "/home/...py", line 54, in <module>
    df = (pd.concat(temp,
  File "/home/../.venv/lib/python3.10/site-packages/pandas/core/reshape/concat.py", line 372, in concat
    op = _Concatenator(
  File "/home/../.venv/lib/python3.10/site-packages/pandas/core/reshape/concat.py", line 452, in __init__
    raise ValueError("All objects passed were None")
ValueError: All objects passed were None

Process finished with exit code 1

我做错了什么?你能帮我吗?

python pandas csv parallel-processing joblib
1个回答
0
投票

这是代码的功能版本。我没有使用字典,而是直接返回数据帧(因为您已经将 ID 作为

master2file
的键)。我将键作为参数传递给
concat
,而不是使用字典。

from joblib import Parallel, delayed
from multiprocessing import cpu_count

n_jobs = cpu_count()

master2file = {'A': 'A.csv', 'B': 'B.csv'}
skip_rows = 4
columns = ['foo', 'bar', 'baz']

def read_file(skiprows: int,
              columns: list[str],
              id: str,
              file: Path
              ) -> None:
    return pd.read_csv(file, skiprows=skiprows, sep=r'\s+', names=columns)


temp = Parallel(n_jobs)(delayed(read_file)(skip_rows, columns, id, file)
                                           for id, file in master2file.items())
df = (pd.concat(temp,
                keys=list(master2file),
                names=['ID'])
      .reset_index('ID')
      .reset_index(drop=True)
      )

输出:

  ID       foo       bar       baz
0  A  0.012093  1.101660  1.087450
1  A  0.012789  1.101050  1.087730
2  A  0.014205  1.099410  1.087600
3  A  0.016280  1.096620  1.085480
4  A  0.019738  1.091700  1.080150
5  B  0.888085  0.768590  0.747961
6  B  0.893782  0.781607  0.760417
7  B  0.899830  0.797021  0.771219
8  B  0.899266  0.799260  0.765859
9  B  0.891489  0.781255  0.728892
© www.soinside.com 2019 - 2024. All rights reserved.