我正在尝试使用 Spark 以最有效的方式读取目录中的多个 CSV 文件。不幸的是,除了单独读取每个文件之外,我还没有找到更好的方法来做到这一点,这非常耗时。
据我了解,读取多个CSV文件最有效的方法是使用
*
,如下所示:
df = spark.read.format('csv') \
.option('header', 'true') \
.load('/path/to/csv/folder/*.csv')
但是,虽然速度非常快,但它并不是按列名进行并集,而是按照列索引进行并集。 例如,如果目录包含以下两个 CSV 文件:
1.csv
:
A | B | C |
---|---|---|
1 | 2 | 5 |
3 | 4 | 6 |
2.csv
:
A | C |
---|---|
7 | 8 |
之前的操作将像这样合并它们:
df
:
A | B | C |
---|---|---|
1 | 2 | 5 |
3 | 4 | 6 |
7 | 8 | 空 |
这显然是不正确,因为最后一行应该是
7|NULL|8
。
嗯,我可以通过单独读取每个文件,然后执行
unionByName
并将 allowMissingColumns
参数设置为 True
来解决这个问题,如下所示:
dfs = []
for filename in list_file_names('/path/to/csv/folder'):
dfs.append(spark.read.format('csv') \
.option('header', 'true') \
.load('/path/to/csv/folder/{filename}')
)
union_df = dfs[0]
for df in dfs[1:]:
union_df = union_df.unionByName(df, allowMissingColumns=True)
这按预期工作,但是当我单独读取每个文件时,它要慢得多。对于同一台机器上的 hdfs 中的 100 个小 CSV 文件,第一个(但错误的)方法在大约 6 秒内完成,而第二个则需要 16 秒。
所以我的问题是,我可以在 PySpark 中通过仅执行一次读取操作来达到与第一种方法相同的结果吗?