Spark 3 KryoSerializer 问题 - 无法找到类:org.apache.spark.util.collection.OpenHashMap

问题描述 投票:0回答:1

我正在将 Spark 2.4 项目升级到 Spark 3.x。我们遇到了一些现有的 Spark-ml 代码的障碍:

var stringIndexers = Array[StringIndexer]()
for (featureColumn <- FEATURE_COLS) {
    stringIndexers = stringIndexers :+ new StringIndexer().setInputCol(featureColumn).setOutputCol(featureColumn + "_index")
}
val pipeline = new Pipeline().setStages(stringIndexers)
val dfWithNumericalFeatures = pipeline.fit(decoratedDf).transform(decoratedDf)

具体来说,这一行:

val dfWithNumericalFeatures = pipeline.fit(decoratedDf).transform(decoratedDf)
现在导致 Spark 3 中的这个神秘异常:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 238.0 failed 1 times, most recent failure: Lost task 0.0 in stage 238.0 (TID 5589) (executor driver): com.esotericsoftware.kryo.KryoException: Unable to find class: org.apache.spark.util.collection.OpenHashMap$mcJ$sp$$Lambda$13346/2134122295
[info] Serialization trace:
[info] org$apache$spark$util$collection$OpenHashMap$$grow (org.apache.spark.util.collection.OpenHashMap$mcJ$sp)
[info]  at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:156)
[info]  at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
[info]  at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
[info]  at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
[info]  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:551)
[info]  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:708)
[info]  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:396)
[info]  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:307)
[info]  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
[info]  at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:397)
[info]  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown Source)
[info]  at org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.deserialize(TypedAggregateExpression.scala:271)
[info]  at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.merge(interfaces.scala:568)
[info]  at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1.$anonfun$applyOrElse$3(AggregationIterator.scala:199)
[info]  at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1.$anonfun$applyOrElse$3$adapted(AggregationIterator.scala:199)
[info]  at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateProcessRow$7(AggregationIterator.scala:213)
[info]  at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateProcessRow$7$adapted(AggregationIterator.scala:207)
[info]  at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:151)
[info]  at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:77)
[info]  at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2(ObjectHashAggregateExec.scala:107)
[info]  at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2$adapted(ObjectHashAggregateExec.scala:85)
[info]  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:885)
[info]  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:885)
[info]  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
[info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
[info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
[info]  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
[info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
[info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
[info]  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
[info]  at org.apache.spark.scheduler.Task.run(Task.scala:131)
[info]  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
[info]  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
[info]  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
[info]  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]  at java.lang.Thread.run(Thread.java:750)
[info] Caused by: java.lang.ClassNotFoundException: org.apache.spark.util.collection.OpenHashMap$mcJ$sp$$Lambda$13346/2134122295
[info]  at java.lang.Class.forName0(Native Method)
[info]  at java.lang.Class.forName(Class.java:348)
[info]  at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154)
[info]  ... 36 more

我四处搜索,我发现的唯一相关问题是这个未回答的 SO 问题具有相同的问题:Spark Kryo 序列化问题.

OpenHashMap
未在我的代码中使用,似乎在此
Pipeline.fit()
函数中 KryoSerializer 存在错误。任何想法如何解决这个问题?谢谢!

编辑:我也只是尝试在单元测试期间删除 KryoSerializer 的使用:

spark = SparkSession
      .builder
      .master("local[*]")
      .appName("UnitTest")
      .config("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
      .config("spark.driver.bindAddress", "127.0.0.1")
      .getOrCreate()

确认我正在使用 JavaSerializer:

println(spark.conf.get("spark.serializer"))
输出
org.apache.spark.serializer.JavaSerializer
。然而,仍然是同样的问题,即使不使用 KryoSerializer。

scala apache-spark apache-spark-mllib kryo spark3
1个回答
0
投票

尝试更改 sparkVersion.
与版本3.1.0
有同样的问题 改为3.3.2

© www.soinside.com 2019 - 2024. All rights reserved.