读取多个 CSV 文件,每个 CSV 文件的列数不同

问题描述 投票:0回答:3

我想使用 PySpark 读取具有不同列数的多个 CSV 文件。

Files=['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv']

f1 文件有 50 列,f2 还有 10 列,总共 60 列,f3 还有 30 列,f3 文件总共 80 列,依此类推。

但是,

df = spark.read.csv(Files,header=True)

仅提供 50 列。我期待 80 列。由于f1文件只有50列,因此剩余的30列将填充f1文件数据的NAN值。其他 CSV 文件也是如此。 Pandas 数据框完美地为我提供了所有 80 列:

import pandas as pd
import glob
df = pd.concat(map(pd.read_csv, ['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv']))

但我不能用 PySpark 做同样的事情。如何将上述 5 个 CSV 文件的所有列读取到单个 Spark 数据帧中?

pyspark apache-spark-sql
3个回答
2
投票

您可以将每个文件读入其自己的 Spark 数据帧,要将所有数据帧合并为一个数据帧,请使用 union。

用更少的列填充数据框中缺失的列。

使用 union 或 reduce 合并它们。

from functools import reduce
from pyspark.sql.functions import lit, col

df_list = [spark.read.csv("f{}.csv".format(i), header=True) for i in range(1, 6)]

cols = [len(df.columns) for df in df_list]
max_cols = max(cols)

df_list = [df.select(*[col(c) for c in df.columns] + [lit(None).alias("col_{}".format(i+j)) for i in range(len(df.columns), max_cols)]) for j, df in enumerate(df_list)]

df_final = reduce(lambda x, y: x.union(y), df_list)

我在这个github上复制了你的案例。


1
投票

这是一个非常简单的修复。我做了什么,

Files=['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv']
Files.reverse()
df = spark.read.csv(Files,inferSchema=True, header=True)

最后一个文件包含所有列,因为列是增量添加的。逆转它们解决了问题。


0
投票

在 Spark 的最新版本(当前为 3.4.1)中,添加 mergeSchema 选项可以有效地允许后来的、更宽的数据帧与更薄的先前数据帧完全集成。

Files=['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv']

df = (
    spark.read.csv(Files) 
    .option("header", True)
    .option("inferSchema", True)
    .option("mergeSchema", True)
)
© www.soinside.com 2019 - 2024. All rights reserved.