如何使用嵌套 Map 作为 Spark 聚合器中的缓冲区?

问题描述 投票:0回答:1

我正在尝试实现一个 Scala Spark Aggregator,其中包含非基本类型(例如,Map[String, Set[String]])的 Map 作为其缓冲区。似乎我可以使用 kryo 或 ExpressionEncoder 来编码基元集合(例如,Set[String]),但是当我将其嵌入到 Map 中时,它似乎找不到编码器。

如何为此类嵌套类型创建编码器?

我尝试过以下方法:

def bufferEncoder: Encoder[Map[String, Set[String]]] = Encoders.kryo[Map[String, Set[String]]]

def bufferEncoder: Encoder[Map[String, Set[String]]] = implicitly(ExpressionEncoder[Map[String, Set[String]]])

对于我编写的另一个聚合器,我使用了

  def bufferEncoder: Encoder[Set[String]] = Encoders.kryo[Set[String]]

有效。

但是当我尝试前两个选项时,我收到此错误:

java.lang.UnsupportedOperationException:找不到编码器 java.util.Map[字符串,数组[字符串]]

  • 根类:“java.util.Map”,位于 org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:567) 在 scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)
scala apache-spark kryo
1个回答
0
投票

虽然根据我的评论,我不会推荐 kyro 这段代码:

def bufferEncoder: Encoder[Map[String, Set[String]]] = implicitly(ExpressionEncoder[Map[String, Set[String]]])

它本身不会起作用。

  1. 不要使用ExpressionEncoder,只使用Encoder(ExpressionEncoder是一个实现细节)。
  2. 您必须导入sparkSession.implicits._才能获得正确的编码器派生(使用Sparks内置编码器)。这些隐式绑定到 Expression 类型 - 而不是 ExpressionEncoder
  3. 仅在需要时考虑具体化编码器,并在需要时传递隐式编码器,这将使交换您使用的编码器变得更容易。 (例如,如果您使用无框)

不幸的是,第 2 点并未在入门页面中直接调用。

import sparkSession.implicits._
val enc = implicitly[Encoder[Map[String,Set[String]]]]

会起作用,但是,正如您所发现的,这不会

import sparkSession.implicits._
val enc = implicitly[ExpressionEncoder[Map[String,Set[String]]]]
© www.soinside.com 2019 - 2024. All rights reserved.