如何在RDD[String]解析的数据集中设置默认值为 "null",应用Case Class作为模式。

问题描述 投票:0回答:4

我正在从一个给定的 RDD[String] 并试图将其转化为 Dataset 与特定 case class. 然而,当JSON字符串没有包含所有需要的字段时,就会出现 case class 我得到一个异常,说找不到缺少的列。

我如何为这种情况定义默认值?

我试着在 case class 但这并没有解决这个问题。我使用的是Spark 2.3.2和Scala 2.11.12。

这段代码工作正常

import org.apache.spark.rdd.RDD

case class SchemaClass(a: String, b: String)

val jsonData: String = """{"a": "foo", "b": "bar"}"""
val jsonRddString: RDD[String] = spark.sparkContext.parallelize(List(jsonData))

import spark.implicits._
val ds = spark.read.json(jsonRddString).as[SchemaClass]

当我运行这段代码时

val jsonDataIncomplete: String = """{"a": "foo"}"""
val jsonIncompleteRddString: RDD[String] = spark.sparkContext.parallelize(List(jsonDataIncomplete))

import spark.implicits._
val dsIncomplete = spark.read.json(jsonIncompleteRddString).as[SchemaClass]

dsIncomplete.printSchema()
dsIncomplete.show()

我得到以下异常

org.apache.spark.sql.AnalysisException: cannot resolve '`b`' given input columns: [a];
  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:92)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:89)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$11.apply(TreeNode.scala:335)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:285)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:333)
[...]

有趣的是,当从文件中解析json字符串时,会应用默认值 "null",就像Spark文档中给出的例子那样,在 数据集 是显示。

val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

json文件的内容

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
scala apache-spark apache-spark-sql
4个回答
1
投票

现在,你可以跳过以RDD加载json,然后以DF的方式读取,直接将JSON字符串转换为数据集。val dsIncomplete = spark.read.json(Seq(jsonDataIncomplete).toDS) 如果你使用的是Spark 2.2以上的版本

  1. 加载你的JSON数据
  2. 从案例类中提取你的模式或手动定义它。
  3. 获取缺失字段列表
  4. 默认值为 lit(null).cast(col.dataType) 为缺失的列。
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, StructType}

object DefaultFieldValue {

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    import spark.implicits._
    val jsonDataIncomplete: String = """{"a": "foo"}"""
    val dsIncomplete = spark.read.json(Seq(jsonDataIncomplete).toDS)
    val schema: StructType = Encoders.product[SchemaClass].schema

    val fields: Array[StructField] = schema.fields

    val outdf = fields.diff(dsIncomplete.columns).foldLeft(dsIncomplete)((acc, col) => {
      acc.withColumn(col.name, lit(null).cast(col.dataType))
    })

    outdf.printSchema()
    outdf.show()


  }
}

case class SchemaClass(a: String, b: Int, c: String, d: Double)




1
投票
package spark

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, Encoders, SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions.{col, lit}

object JsonDF extends App {


  val spark = SparkSession.builder()
    .master("local")
    .appName("DataFrame-example")
    .getOrCreate()

  import spark.implicits._

  case class SchemaClass(a: String, b: Int)

  val jsonDataIncomplete: String = """{"a": "foo", "m": "eee"}"""
  val jsonIncompleteRddString: RDD[String] = spark.sparkContext.parallelize(List(jsonDataIncomplete))

  val dsIncomplete = spark.read.json(jsonIncompleteRddString)  // .as[SchemaClass]

  lazy val schema: StructType    = Encoders.product[SchemaClass].schema
  lazy val fields: Array[String] = schema.fieldNames
  lazy val colNames: Array[Column]  = fields.map(col(_))

  val sch = dsIncomplete.schema
  val schemaDiff = schema.diff(sch)
  val rr = schemaDiff.foldLeft(dsIncomplete)((acc, col) => {
    acc.withColumn(col.name, lit(null).cast(col.dataType))
  })



  val schF = dsIncomplete.schema
  val schDiff = schF.diff(schema)

  val rrr = schDiff.foldLeft(rr)((acc, col) => {
    acc.drop(col.name)
  })
    .select(colNames: _*)


}

1
投票

如果您在同一个 RDD 中有不同的 json 字符串,它也会以同样的方式工作。当您只有一个字符串与模式不匹配时,它将抛出错误。

例如

val jsonIncompleteRddString: RDD[String] = spark.sparkContext.parallelize(List(jsonDataIncomplete, jsonData))

import spark.implicits._
val dsIncomplete = spark.read.json(jsonIncompleteRddString).as[SchemaClass]

dsIncomplete.printSchema()
dsIncomplete.show()

scala> dsIncomplete.show()
+---+----+
|  a|   b|
+---+----+
|foo|null|
|foo| bar|
+---+----+

你可以做的一种方法是将其转换为[Person],而不是将其转换为[Person],你可以从中建立schema(StructType),并在读取json文件时应用它。

import org.apache.spark.sql.Encoders

val schema = Encoders.product[Person].schema

val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.schema(schema).json(path).as[Person]
peopleDS.show
+-------+----+
|   name| age|
+-------+----+
|Michael|null|
+-------+----+

代码文件的内容是:

{"name":"Michael"}

0
投票

从@Sathiyan S的回答中,我得到了以下的解决方案(在此介绍一下,因为它并没有完全解决我的问题,但却成为了正确的方向的指针)。

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.types.{StructField, StructType}

// created expected schema
val schema = Encoders.product[SchemaClass].schema

// convert all fields as nullable
val newSchema = StructType(schema.map {
  case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = true, m)
})

// apply expected and nullable schema for parsing json string            
session.read.schema(newSchema).json(jsonIncompleteRddString).as[SchemaClass]

优点:

  • 所有缺失的字段都设置为 null,与数据类型无关
  • json 字符串中的附加字段,它们不属于 case class 将被忽略
© www.soinside.com 2019 - 2024. All rights reserved.