Spark 2.0 隐式编码器,当类型为 Option[Seq[String]] (scala) 时处理缺失列

问题描述 投票:0回答:2

当我们的数据源中缺少某些 Option[Seq[String]] 类型的列时,我在编码数据时遇到一些问题。理想情况下,我希望用

None
填充缺失的列数据。

场景:

我们正在读取一些镶木地板文件,其中有 column1 但没有 column2

我们将这些 parquet 文件中的数据加载到

Dataset
中,并将其转换为
MyType

case class MyType(column1: Option[String], column2: Option[Seq[String]])

sqlContext.read.parquet("dataSource.parquet").as[MyType]

org.apache.spark.sql.AnalysisException:无法解析给定输入列的“

column2
”:[column1];

有没有办法创建第2列数据为

None
的数据集?

scala apache-spark apache-spark-dataset
2个回答
10
投票

在简单的情况下,您可以提供一个初始模式,它是预期模式的超集。例如您的情况:

val schema = Seq[MyType]().toDF.schema

Seq("a", "b", "c").map(Option(_))
  .toDF("column1")
  .write.parquet("/tmp/column1only")

val df = spark.read.schema(schema).parquet("/tmp/column1only").as[MyType]
df.show
+-------+-------+
|column1|column2|
+-------+-------+
|      a|   null|
|      b|   null|
|      c|   null|
+-------+-------+
df.first
MyType = MyType(Some(a),None)

这种方法可能有点脆弱,所以一般来说你应该使用 SQL 文字来填补空白:

spark.read.parquet("/tmp/column1only")
  // or ArrayType(StringType)
  .withColumn("column2", lit(null).cast("array<string>"))
  .as[MyType]
  .first
MyType = MyType(Some(a),None)

0
投票

从 Spark 3.1 开始,您可以使用

allowMissingColumns
参数
Dataset#unionByName
来引入缺失的列:

spark.read.parquet("dataSource.parquet")
  // Add extra column by joining with empty dataframe using the expected schema
  .unionByName(spark.createDataFrame(sc.emptyRDD[Row], schema), true)
  // Extra check to see if schema really matches
  .as(RowEncoder.apply(schema))
© www.soinside.com 2019 - 2024. All rights reserved.