以数组作为输入的 Spark 聚合器

问题描述 投票:0回答:1

我正在将一些 Scala Spark UDAF 从 UserDefinedAggregateFunction 迁移到 Aggregator。其中之一采用 Array[String] 作为输入,在本地测试中执行聚合器时,我遇到了一个奇怪的异常。我已将代码简化为非常基本的聚合器,但在读取数组时仍然遇到此错误。我对其他输入类型没有问题。

简化示例如下:

  class ArrayInputAggregator extends Aggregator[Array[String], Int, Int] with Serializable {

    override def zero = {0}

    override def reduce(buffer: Int, newItem: Array[String]): Int = {
      buffer + newItem.length
    }

    override def merge(b1: Int, b2: Int): Int = {
      b1 + b2
    }

    override def finish(reduction: Int): Int = reduction
    def bufferEncoder: Encoder[Int] = Encoders.scalaInt
    def outputEncoder: Encoder[Int] = Encoders.scalaInt
  }
}

我正在用这段代码测试它:

val test = udaf(new ArrayInputAggregator())

val d = spark
      .sql("select array('asd','tre','asd') arr")
      .groupBy()
      .agg(test($"arr").as("cnt"))

d.show

这是我遇到的异常:

2023-12-24 12:06:24,678 错误 Spark.executor.Executor - 异常 任务 0.0 在阶段 0.0 (TID 0) java.lang.NullPointerException: null at org.apache.spark.sql.catalyst.expressions.objects.MapObjects$.apply(objects.scala:682) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$172$$anonfun$10.applyOrElse(Analyzer.scala:3033) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$172$$anonfun$10.applyOrElse(Analyzer.scala:3029) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$172.applyOrElse(Analyzer.scala:3029) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$172.applyOrElse(Analyzer.scala:3018) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsDown$1(QueryPlan.scala:96) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:118) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:118) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:129) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:139) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:139) 〜[spark-catalyst_2.12-3.0.0.jar:3.0.0] 在 org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:96)

(一直持续下去)

scala apache-spark aggregate-functions user-defined-functions
1个回答
0
投票

看来这个bug已经在Spark 3.0.1及更高版本中修复了。

© www.soinside.com 2019 - 2024. All rights reserved.