我有一些这样的代码:
object Helpers {
val getPercentVariationInterval = (
prevStartClose: Double,
prevEndClose: Double,
prevStartDate: Date,
prevEndDate: Date,
newClose: Double,
newDate: Date
) =>
{
return something as
(Double, Double, Date, Date)
}
val seqOp = (
acc: (Double, Double, Date, Date, Double, Double, Long, Int), values: (Double, Double, Double, Double, Long, Int, Date)
) =>
{
getPercentVariationInterval()
return something as (Double, Double, Date, Date, Double, Double, Long, Int)
}
val compOP = (
acc1: (Double, Double, Date, Date, Double, Double, Long, Int), acc2: (Double, Double, Date, Date, Double, Double, Long, Int)
) =>
{
getPercentVariationInterval()
return something as (Double, Double, Date, Date, Double, Double, Long, Int)
}
}
object JobOne extends Serializable {
val run = () => {
val rdd = ...
val zeroVal = some value
val result = rdd.aggregateByKey(zeroVal)(seqOP,compOP)
}
}
object App{
def main(args: Array[String]) {
JobOne.run()
}
}
错误是此错误:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2379)
at org.apache.spark.rdd.PairRDDFunctions.$anonfun$combineByKeyWithClassTag$1(PairRDDFunctions.scala:86)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.PairRDDFunctions.combineByKeyWithClassTag(PairRDDFunctions.scala:75)
at org.apache.spark.rdd.PairRDDFunctions.$anonfun$aggregateByKey$1(PairRDDFunctions.scala:168)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.PairRDDFunctions.aggregateByKey(PairRDDFunctions.scala:157)
at org.apache.spark.rdd.PairRDDFunctions.$anonfun$aggregateByKey$5(PairRDDFunctions.scala:197)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.PairRDDFunctions.aggregateByKey(PairRDDFunctions.scala:197)
at big_data.job_one.JobOne$.$anonfun$run$1(App.scala:101)
at big_data.job_one.App$.main(App.scala:116)
at big_data.job_one.App.main(App.scala)
Caused by: java.io.NotSerializableException: scala.runtime.LazyRef
Serialization stack:
- object not serializable (class: scala.runtime.LazyRef, value: LazyRef thunk)
- element of array (index: 2)
- array (class [Ljava.lang.Object;, size 3)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.rdd.PairRDDFunctions, functionalInterfaceMethod=scala/Function0.apply:()Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/rdd/PairRDDFunctions.$anonfun$aggregateByKey$2:([BLscala/reflect/ClassTag;Lscala/runtime/LazyRef;)Ljava/lang/Object;, instantiatedMethodType=()Ljava/lang/Object;, numCaptured=3])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.rdd.PairRDDFunctions$$Lambda$832/956429999, org.apache.spark.rdd.PairRDDFunctions$$Lambda$832/956429999@150ede8b)
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 2)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.rdd.PairRDDFunctions, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/rdd/PairRDDFunctions.$anonfun$aggregateByKey$3:(Lscala/Function2;Lscala/Function0;Ljava/lang/Object;)Ljava/lang/Object;, instantiatedMethodType=(Ljava/lang/Object;)Ljava/lang/Object;, numCaptured=2])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.rdd.PairRDDFunctions$$Lambda$833/925152318, org.apache.spark.rdd.PairRDDFunctions$$Lambda$833/925152318@4b3fe06e)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393)
... 21 more
我不知道这部分的内容是不可序列化的,试图通过在JobOne中调用seqOP和compOP来运行它们(例如println(seqOP(something,something))),如果我在内部传递函数,就会出现问题AggregateByKey。我已经阅读了几个答案,但是似乎没有帮助,无论是扩展Serializable还是将def转换为函数。
我已经尝试将三个函数单独放在一个对象中,我只是尝试将它们作为匿名函数拍入aggregateByKey中,我尝试将参数和返回类型更改为更简单的形式。无效。
这是整个代码(如果需要的话,请让我发疯:https://scalafiddle.io/sf/kh5zcN4/0
编辑:对不起,我误删了原始问题,现在是凌晨3点,我已经尝试理解了几个小时。
您需要为辅助对象扩展Serializable
,因为您将其用于Rdd时需要将其传递给从属计算机:
object Helpers extends Serializable {
...