我正在从一个给定的 RDD[String]
并试图将其转化为 Dataset
与特定 case class
. 然而,当JSON字符串没有包含所有需要的字段时,就会出现 case class
我得到一个异常,说找不到缺少的列。
我如何为这种情况定义默认值?
我试着在 case class
但这并没有解决这个问题。我使用的是Spark 2.3.2和Scala 2.11.12。
这段代码工作正常
import org.apache.spark.rdd.RDD
case class SchemaClass(a: String, b: String)
val jsonData: String = """{"a": "foo", "b": "bar"}"""
val jsonRddString: RDD[String] = spark.sparkContext.parallelize(List(jsonData))
import spark.implicits._
val ds = spark.read.json(jsonRddString).as[SchemaClass]
当我运行这段代码时
val jsonDataIncomplete: String = """{"a": "foo"}"""
val jsonIncompleteRddString: RDD[String] = spark.sparkContext.parallelize(List(jsonDataIncomplete))
import spark.implicits._
val dsIncomplete = spark.read.json(jsonIncompleteRddString).as[SchemaClass]
dsIncomplete.printSchema()
dsIncomplete.show()
我得到以下异常
org.apache.spark.sql.AnalysisException: cannot resolve '`b`' given input columns: [a];
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:92)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:89)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$11.apply(TreeNode.scala:335)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:333)
[...]
有趣的是,当从文件中解析json字符串时,会应用默认值 "null",就像Spark文档中给出的例子那样,在 数据集 是显示。
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
json文件的内容
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
现在,你可以跳过以RDD加载json,然后以DF的方式读取,直接将JSON字符串转换为数据集。val dsIncomplete = spark.read.json(Seq(jsonDataIncomplete).toDS)
如果你使用的是Spark 2.2以上的版本
lit(null).cast(col.dataType)
为缺失的列。import org.apache.spark.sql.Encoders
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, StructType}
object DefaultFieldValue {
def main(args: Array[String]): Unit = {
val spark = Constant.getSparkSess
import spark.implicits._
val jsonDataIncomplete: String = """{"a": "foo"}"""
val dsIncomplete = spark.read.json(Seq(jsonDataIncomplete).toDS)
val schema: StructType = Encoders.product[SchemaClass].schema
val fields: Array[StructField] = schema.fields
val outdf = fields.diff(dsIncomplete.columns).foldLeft(dsIncomplete)((acc, col) => {
acc.withColumn(col.name, lit(null).cast(col.dataType))
})
outdf.printSchema()
outdf.show()
}
}
case class SchemaClass(a: String, b: Int, c: String, d: Double)
package spark
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, Encoders, SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions.{col, lit}
object JsonDF extends App {
val spark = SparkSession.builder()
.master("local")
.appName("DataFrame-example")
.getOrCreate()
import spark.implicits._
case class SchemaClass(a: String, b: Int)
val jsonDataIncomplete: String = """{"a": "foo", "m": "eee"}"""
val jsonIncompleteRddString: RDD[String] = spark.sparkContext.parallelize(List(jsonDataIncomplete))
val dsIncomplete = spark.read.json(jsonIncompleteRddString) // .as[SchemaClass]
lazy val schema: StructType = Encoders.product[SchemaClass].schema
lazy val fields: Array[String] = schema.fieldNames
lazy val colNames: Array[Column] = fields.map(col(_))
val sch = dsIncomplete.schema
val schemaDiff = schema.diff(sch)
val rr = schemaDiff.foldLeft(dsIncomplete)((acc, col) => {
acc.withColumn(col.name, lit(null).cast(col.dataType))
})
val schF = dsIncomplete.schema
val schDiff = schF.diff(schema)
val rrr = schDiff.foldLeft(rr)((acc, col) => {
acc.drop(col.name)
})
.select(colNames: _*)
}
如果您在同一个 RDD 中有不同的 json 字符串,它也会以同样的方式工作。当您只有一个字符串与模式不匹配时,它将抛出错误。
例如
val jsonIncompleteRddString: RDD[String] = spark.sparkContext.parallelize(List(jsonDataIncomplete, jsonData))
import spark.implicits._
val dsIncomplete = spark.read.json(jsonIncompleteRddString).as[SchemaClass]
dsIncomplete.printSchema()
dsIncomplete.show()
scala> dsIncomplete.show()
+---+----+
| a| b|
+---+----+
|foo|null|
|foo| bar|
+---+----+
你可以做的一种方法是将其转换为[Person],而不是将其转换为[Person],你可以从中建立schema(StructType),并在读取json文件时应用它。
import org.apache.spark.sql.Encoders
val schema = Encoders.product[Person].schema
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.schema(schema).json(path).as[Person]
peopleDS.show
+-------+----+
| name| age|
+-------+----+
|Michael|null|
+-------+----+
代码文件的内容是:
{"name":"Michael"}
从@Sathiyan S的回答中,我得到了以下的解决方案(在此介绍一下,因为它并没有完全解决我的问题,但却成为了正确的方向的指针)。
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.types.{StructField, StructType}
// created expected schema
val schema = Encoders.product[SchemaClass].schema
// convert all fields as nullable
val newSchema = StructType(schema.map {
case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = true, m)
})
// apply expected and nullable schema for parsing json string
session.read.schema(newSchema).json(jsonIncompleteRddString).as[SchemaClass]
优点:
null
,与数据类型无关case class
将被忽略