我有一个用例,我想向 Map 对象添加另一个条目。这是设置(使用 Scala 2.13.13 + Spark 3.3.1):
val json =
"""
[
{
"info" : {
"1234" : {
"name": "John Smith",
"age": 29,
"gender": "male"
}
}
}
]
"""
val personSchema = new StructType()
.add("name", StringType)
.add("age", IntegerType)
.add("gender", StringType)
val schema = new StructType().add("info", MapType(StringType, personSchema))
val spark = SparkSession.builder()
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val df = spark.read.schema(schema).json(Seq(json).toDS)
df.show(false)
在这个例子中,我有一个
info
对象,它的 Map
为 String
到一个人 Struct
。我想动态生成另一个 Person 并将其添加到 info
对象。所以在 JSON 中,我最终会得到:
{
"info": {
"1234": {
"name": "John Smith",
"age": 29,
"gender": "male"
},
"456": {
"name": "Robert Jones",
"age": 35,
"gender": "male"
}
}
}
为此,我添加了以下 UDF:
val addPersonUDF = udf((infoMap: Map[String, Row]) => {
infoMap + ("456" -> new GenericRowWithSchema(Array("Robert Jones", 35, "male"), personSchema))
})
df.select(col("*"), addPersonUDF(col("info"))).show(false)
但我不断收到错误:
Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Row is not supported
at org.apache.spark.sql.errors.QueryExecutionErrors$.schemaForTypeUnsupportedError(QueryExecutionErrors.scala:1193)
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:802)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:948)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:947)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:718)
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:744)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:948)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:947)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:718)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:714)
at org.apache.spark.sql.functions$.$anonfun$udf$6(functions.scala:5124)
at scala.Option.getOrElse(Option.scala:201)
at org.apache.spark.sql.functions$.udf(functions.scala:5124)
我做错了什么?
对人员数据使用 Scala 案例类,以便 Spark 可以自动干扰模式。这样您就可以避免使用通用行。
case class Person(name:String, age: Integer, gender: String)
val schema = new StructType().add("info",
ScalaReflection.schemaFor[Map[String,Person]].dataType)
val df = spark.read.schema(schema).json(Seq(json).toDS)
val addPersonUDF = udf((infoMap: Map[String, Person]) => {
infoMap + ("456" -> Person("Robert Jones", 35, "male"))
})
df.select(col("*"), addPersonUDF(col("info"))).show(false)
输出:
+--------------------------------+-----------------------------------------------------------------+
|info |UDF(info) |
+--------------------------------+-----------------------------------------------------------------+
|{1234 -> {John Smith, 29, male}}|{1234 -> {John Smith, 29, male}, 456 -> {Robert Jones, 35, male}}|
+--------------------------------+-----------------------------------------------------------------+