Spark streaming:com.esotericsoftware.kryo.KryoException:java.lang.IllegalArgumentException:Class未注册:scala.Tuple2 $ mcJZ $ sp

问题描述 投票:2回答:2

我试图在火花流中使用Kryo Serializer。我在Spark tuning docs读到 -

最后,如果你没有注册你的自定义类,Kryo仍然会工作,但它必须存储每个对象的完整类名,这是浪费。

所以我想尝试注册所有课程。我的案例类是 -

trait Message extends java.io.Serializable

object MutableTypes {
  type Childs = scala.collection.mutable.Map[Int, (Long, Boolean)]
  type Parents = scala.collection.mutable.Map[Int, Childs]
}

case class IncomingRecord(id_1: String, id_raw: String, parents_to_add: MutableTypes.Parents, parents_to_delete: MutableTypes.Parents) extends Message

我正在注册这样的课程 -

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrationRequired","true")
sparkConf.registerKryoClasses(Array(classOf[Tuple2[Long,Boolean]],classOf[IncomingRecord]))

我有这个例外:

com.esotericsoftware.kryo.KryoException:java.lang.IllegalArgumentException:未注册类:scala.Tuple2 $ mcJZ $ sp注意:要注册此类,请使用:kryo.register(scala.Tuple2 $ mcJZ $ sp.class);序列化跟踪:parent.to_add(com.test.IncomingRecord)at com.esotericsoftware.kryo.serializers.FieldSerializer $ ObjectField.write(FieldSerializer.java:585)at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213 )在com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:194)org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:147)at org.apache.spark.storage.DiskBlockObjectWriter.write (DiskBlockObjectWriter.scala:185)org.apache.spark.util.collection.WritablePartitionedPairCollection $$ anon $ 1.writeNext(WritablePartitionedPairCollection.scala:56)at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala) :659)org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72)位于org.apache.spark的org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)。 scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)atg.apache.spark.scheduler.Task.run(Task.scala:89)at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala: 214)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617)在java.lang.Thread.run(Thread.java:745)

我如何注册我的课程?怎么解决这个?

更新:

我知道转向注册false将删除异常,但由于额外的开销,这不会增加那么多的性能。我想知道如何注册我的课程。

scala serialization apache-spark spark-streaming kryo
2个回答
0
投票

最后,如果你没有注册你的自定义类,Kryo仍然会工作,但它必须存储每个对象的完整类名,这是浪费。仅当使用spark.kryo.registrationRequired的默认值时才会出现这种情况“(这是假的)

以下应解决异常问题(或避免为此参数设置任何值并使用默认值false)

.set("spark.kryo.registrationRequired","false")

更多信息可以在这里找到:http://spark.apache.org/docs/latest/configuration.html

spark.kryo.registrationRequired false(默认值)是否要求注册Kryo。如果设置为'true',如果未注册的类被序列化,Kryo将抛出异常。如果设置为false(默认值),Kryo将编写未注册的类名以及每个对象。编写类名会导致显着的性能开销,因此启用此选项可以严格执行用户未从注册中省略的类。

一些要点 - 如何注册kryo序列化:


0
投票

我在另一个stackoverflow答案中提供了一个方法来获取所有需要快速注册的类名。

见:https://stackoverflow.com/a/55644422/5981256

© www.soinside.com 2019 - 2024. All rights reserved.