如何从Spark UDF返回Row?

问题描述 投票:0回答:1

我有一个用例,我想向 Map 对象添加另一个条目。这是设置(使用 Scala 2.13.13 + Spark 3.3.1):

    val json =
      """
         [
           {
              "info" : {
                "1234" : {
                  "name": "John Smith",
                  "age": 29,
                  "gender": "male"
                }
              }
           }
         ]
      """

    val personSchema = new StructType()
      .add("name", StringType)
      .add("age", IntegerType)
      .add("gender", StringType)
    val schema = new StructType().add("info", MapType(StringType, personSchema))

    val spark = SparkSession.builder()
      .master("local[*]")
      .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    import spark.implicits._
    val df = spark.read.schema(schema).json(Seq(json).toDS)
    df.show(false)

在这个例子中,我有一个

info
对象,它的
Map
String
到一个人
Struct
。我想动态生成另一个 Person 并将其添加到
info
对象。所以在 JSON 中,我最终会得到:

{
  "info": {
    "1234": {
      "name": "John Smith",
      "age": 29,
      "gender": "male"
    },
    "456": {
      "name": "Robert Jones",
      "age": 35,
      "gender": "male"
    }
  }
}

为此,我添加了以下 UDF:

    val addPersonUDF = udf((infoMap: Map[String, Row]) => {
      infoMap + ("456" -> new GenericRowWithSchema(Array("Robert Jones", 35, "male"), personSchema))
    })
    df.select(col("*"), addPersonUDF(col("info"))).show(false)

但我不断收到错误:

Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Row is not supported
    at org.apache.spark.sql.errors.QueryExecutionErrors$.schemaForTypeUnsupportedError(QueryExecutionErrors.scala:1193)
    at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:802)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:948)
    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:947)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:718)
    at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:744)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:948)
    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:947)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:718)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:714)
    at org.apache.spark.sql.functions$.$anonfun$udf$6(functions.scala:5124)
    at scala.Option.getOrElse(Option.scala:201)
    at org.apache.spark.sql.functions$.udf(functions.scala:5124)

我做错了什么?

scala apache-spark user-defined-functions
1个回答
0
投票

对人员数据使用 Scala 案例类,以便 Spark 可以自动干扰模式。这样您就可以避免使用通用行。

case class Person(name:String, age: Integer, gender: String)

val schema = new StructType().add("info", 
             ScalaReflection.schemaFor[Map[String,Person]].dataType)
val df = spark.read.schema(schema).json(Seq(json).toDS)

val addPersonUDF = udf((infoMap: Map[String, Person]) => {
  infoMap + ("456" -> Person("Robert Jones", 35, "male"))
})

df.select(col("*"), addPersonUDF(col("info"))).show(false)

输出:

+--------------------------------+-----------------------------------------------------------------+
|info                            |UDF(info)                                                        |
+--------------------------------+-----------------------------------------------------------------+
|{1234 -> {John Smith, 29, male}}|{1234 -> {John Smith, 29, male}, 456 -> {Robert Jones, 35, male}}|
+--------------------------------+-----------------------------------------------------------------+
© www.soinside.com 2019 - 2024. All rights reserved.