PySpark:如何最有效地读取具有不同列位置的多个 CSV 文件

问题描述 投票:0回答:1

我正在尝试使用 Spark 以最有效的方式读取目录中的多个 CSV 文件。不幸的是,除了单独读取每个文件之外,我还没有找到更好的方法来做到这一点,这非常耗时。

据我了解,读取多个CSV文件最有效的方法是使用

*
,如下所示:

df = spark.read.format('csv') \
        .option('header', 'true') \
        .load('/path/to/csv/folder/*.csv')

但是,虽然速度非常快,但它并不是按列名进行并集,而是按照列索引进行并集。 例如,如果目录包含以下两个 CSV 文件:

1.csv

A B C
1 2 5
3 4 6

2.csv

A C
7 8

之前的操作将像这样合并它们:

df

A B C
1 2 5
3 4 6
7 8

这显然是不正确,因为最后一行应该是

7|NULL|8

嗯,我可以通过单独读取每个文件,然后执行

unionByName
并将
allowMissingColumns
参数设置为
True
来解决这个问题,如下所示:

dfs = []
for filename in list_file_names('/path/to/csv/folder'):
    dfs.append(spark.read.format('csv') \
        .option('header', 'true') \
        .load('/path/to/csv/folder/{filename}')
    )
union_df = dfs[0]
for df in dfs[1:]:
    union_df = union_df.unionByName(df, allowMissingColumns=True)

这按预期工作,但是当我单独读取每个文件时,它要慢得多。对于同一台机器上的 hdfs 中的 100 个小 CSV 文件,第一个(但错误的)方法在大约 6 秒内完成,而第二个则需要 16 秒

所以我的问题是,我可以在 PySpark 中通过仅执行一次读取操作来达到与第一种方法相同的结果吗?

python csv apache-spark pyspark apache-spark-sql
1个回答
0
投票

我可以通过仅执行一次读取操作在 PySpark 中获得相同的结果吗

遗憾的是,正如您所注意到的,由于架构合并限制,您无法一次性使用 Spark 数据源 api。

相反,您可以通过首先读取每个文件的标头,按 csv 类别对它们进行分组,然后按路径合并每个文件类别来优化联合方法。

获取与文件路径关联的所有第一行可以在纯Python中实现例如boto

然后可以使用逗号分隔的路径列表来一次性读取文件列表。

虽然分两步,但如果您的 csc 类别很少,这应该比合并每个文件要快得多。

© www.soinside.com 2019 - 2024. All rights reserved.