我在 S3 上有两个
csv
文件:
# a1.csv
a,b
3,4
和
# b2.csv
a,c
1,"text"
我想立即读取它们,确保最终的数据帧包含所有文件中的所有列,如下所示:
+---+----+----+
| a| b| c|
+---+----+----+
| 1|null|text|
| 3| 4|null|
+---+----+----+
我尝试了
inferSchema
和 schema
选项,但它们没有提供我期望的结果。
选项 1:
df = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.csv("s3a://test/*.csv")\
.show()
+---+----+
| a| c|
+---+----+
| 1|text|
| 3| 4|
+---+----+
选项2:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("a", IntegerType(), False)
,StructField("b", IntegerType(), True)
,StructField("c", StringType(), True)
])
df = spark.read\
.option("header", True)\
.schema(schema)\
.csv("s3a://test/*.csv")\
.show()
+---+----+----+
| a| b| c|
+---+----+----+
| 1|null|null|
| 3| 4|null|
+---+----+----+
有什么想法可以实现这一目标吗?
如果文件格式是镶木地板,那么我们可以使用 mergeSchema 选项通过指向包含多个文件的文件夹来轻松合并架构,但对于 CSV 文件,我们没有该选项。
您可以使用 unionByName 函数来获得所需的结果。
迭代文件夹并将文件读入数据帧,然后您可以调用 unionByName 函数。
df1.unionByName(df2, True)