我正在将一些 Scala Spark UDAF 从 UserDefinedAggregateFunction 迁移到 Aggregator。其中之一采用 Array[String] 作为输入,在本地测试中执行聚合器时,我遇到了一个奇怪的异常。我已将代码简化为非常基本的聚合器,但在读取数组时仍然遇到此错误。我对其他输入类型没有问题。
简化示例如下:
class ArrayInputAggregator extends Aggregator[Array[String], Int, Int] with Serializable {
override def zero = {0}
override def reduce(buffer: Int, newItem: Array[String]): Int = {
buffer + newItem.length
}
override def merge(b1: Int, b2: Int): Int = {
b1 + b2
}
override def finish(reduction: Int): Int = reduction
def bufferEncoder: Encoder[Int] = Encoders.scalaInt
def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
}
我正在用这段代码测试它:
val test = udaf(new ArrayInputAggregator())
val d = spark
.sql("select array('asd','tre','asd') arr")
.groupBy()
.agg(test($"arr").as("cnt"))
d.show
这是我遇到的异常:
2023-12-24 12:06:24,678 错误 Spark.executor.Executor - 异常 任务 0.0 在阶段 0.0 (TID 0) java.lang.NullPointerException: null at org.apache.spark.sql.catalyst.expressions.objects.MapObjects$.apply(objects.scala:682) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$172$$anonfun$10.applyOrElse(Analyzer.scala:3033) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$172$$anonfun$10.applyOrElse(Analyzer.scala:3029) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$172.applyOrElse(Analyzer.scala:3029) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$172.applyOrElse(Analyzer.scala:3018) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsDown$1(QueryPlan.scala:96) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:118) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:118) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:129) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:139) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:139) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:96)
(一直持续下去)
看来这个bug已经在Spark 3.0.1及更高版本中修复了。