嗨,第一次出于绝望而发帖^^U,我正在努力让这项工作成功。这个想法是:从一个包含一列代表 id 列表的 Dataframe 中,我想返回一个新的 Dataframe,其中一个新列代表该 id 内过去记录的度量列表。 我收到“任务不可序列化错误”,我认为它指向 SparkContext 实例,如日志中所示:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@42ebd0a3)
- field (class: Myclass$$anonfun$6, name: sc$1, type: class org.apache.spark.SparkContext)
- object (class Myclass$$anonfun$6, <function1>)
我猜map函数内部有一些东西不能存在,因为它指向SparkContext,所以我现在在
myMethod
和myDaoMethod
中显式使用SparkContext作为参数,我的所有类都实现了Serialized。
欢迎任何帮助。谢谢你。
def myMethod(df: DataFrame, factory: myFactory, sc: SparkContext)
(implicit sqlContext: SQLContext) : DataFrame = {
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// function to return date n weeks ago
val getDateNWeeksAgo: (String, Int) => String = (date: String, n: Int) => LocalDate.parse(date, DateTimeFormatter.BASIC_ISO_DATE).minusWeeks(n).toString
val myNewDF= df.rdd.map(r=> {
val name = r.getAs[String]("name")
val ym: String = r.getAs[String]("ym")
val dd: String = r.getAs[String]("dd")
val ymd: String = r.getAs[String]("ymd")
val mag = r.getAs[String]("mag")
val listId = r.getAs[String]("list_id") // list --> String [1, 5, 24]
val listSplit = listId.substring(1, listId.length - 1).split(",") // Array[1, 5, 24]
val listValues = new util.ArrayList[String]() // List to store the
for (id <- 0 until listSplit.length) { // loop through the array of ids
var value = 0d
val meas1wAgo = findValueById(myDao.MyDaoMethod(name, getDateNWeeksAgo(ymd, 1), mag)(sqlContext, sc), listSplit(id))
/* more code regarding algorithm with more measures n weeks ago*/
value = meas1wAgo.toDouble
listValues.add(value.toString)
}
Row(name, ym, dd, mag, listId, listValues)
})
// Define the schema for the resulting DataFrame
val schema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("meas_ym", StringType, nullable = false),
StructField("meas_dd", StringType, nullable = false),
StructField("mag", StringType, nullable = false),
StructField("list_id", StringType, nullable = false),
StructField("listValues", DataTypes.createArrayType(DataTypes.StringType), nullable = false)
))
// Create a DataFrame from the RDD[Row] with the specified schema
val DFwithValues= sqlContext.createDataFrame(myNewDF, schema)
DFwithValues
}
MyDaoMethod
在更大的方法之外定义,并正确查询数据库并返回一个数据帧,其中包含给定名称、日期和mag的所需日期的度量。
findValueById
在外部定义,并在给定 Dataframe 和度量的 id 的情况下正确返回字符串形式的度量。
我得到的 stackTrace 如下:
diagnostics: User class threw exception: org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:415)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:405)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2353)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:393)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:392)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.map(RDD.scala:392)
at /* user comment: Map Line -> /*scala:307)
[...]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:675)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@42ebd0a3)
- field (class: MyClass$$anonfun$6, name: sc$1, type: class org.apache.spark.SparkContext)
- object (class Myclass$$anonfun$6, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:412)
... 25 more