我正在尝试实现一个 Scala Spark Aggregator,其中包含非基本类型(例如,Map[String, Set[String]])的 Map 作为其缓冲区。似乎我可以使用 kryo 或 ExpressionEncoder 来编码基元集合(例如,Set[String]),但是当我将其嵌入到 Map 中时,它似乎找不到编码器。
如何为此类嵌套类型创建编码器?
我尝试过以下方法:
def bufferEncoder: Encoder[Map[String, Set[String]]] = Encoders.kryo[Map[String, Set[String]]]
和
def bufferEncoder: Encoder[Map[String, Set[String]]] = implicitly(ExpressionEncoder[Map[String, Set[String]]])
对于我编写的另一个聚合器,我使用了
def bufferEncoder: Encoder[Set[String]] = Encoders.kryo[Set[String]]
有效。
但是当我尝试前两个选项时,我收到此错误:
java.lang.UnsupportedOperationException:找不到编码器 java.util.Map[字符串,数组[字符串]]
- 根类:“java.util.Map”,位于 org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:567) 在 scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)
虽然根据我的评论,我不会推荐 kyro 这段代码:
def bufferEncoder: Encoder[Map[String, Set[String]]] = implicitly(ExpressionEncoder[Map[String, Set[String]]])
它本身不会起作用。
不幸的是,第 2 点并未在入门页面中直接调用。
import sparkSession.implicits._
val enc = implicitly[Encoder[Map[String,Set[String]]]]
会起作用,但是,正如您所发现的,这不会
import sparkSession.implicits._
val enc = implicitly[ExpressionEncoder[Map[String,Set[String]]]]