Spark 与 Scala:由于 SparkContext,任务无法序列化

问题描述 投票:0回答:0

嗨,第一次出于绝望而发帖^^U,我正在努力让这项工作成功。这个想法是:从一个包含一列代表 id 列表的 Dataframe 中,我想返回一个新的 Dataframe,其中一个新列代表该 id 内过去记录的度量列表。 我收到“任务不可序列化错误”,我认为它指向 SparkContext 实例,如日志中所示:

- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@42ebd0a3)
    - field (class: Myclass$$anonfun$6, name: sc$1, type: class org.apache.spark.SparkContext)
    - object (class Myclass$$anonfun$6, <function1>)

我猜map函数内部有一些东西不能存在,因为它指向SparkContext,所以我现在在

myMethod
myDaoMethod
中显式使用SparkContext作为参数,我的所有类都实现了Serialized。

欢迎任何帮助。谢谢你。

def myMethod(df: DataFrame, factory: myFactory, sc: SparkContext)
                       (implicit sqlContext: SQLContext)   : DataFrame = {

  import org.apache.spark.sql.Row
  import org.apache.spark.sql.types._

  // function to return date n weeks ago
  val getDateNWeeksAgo: (String, Int) => String = (date: String, n: Int) => LocalDate.parse(date, DateTimeFormatter.BASIC_ISO_DATE).minusWeeks(n).toString 

  val myNewDF= df.rdd.map(r=> {

    val name = r.getAs[String]("name")
    val ym: String = r.getAs[String]("ym")
    val dd: String = r.getAs[String]("dd")
    val ymd: String = r.getAs[String]("ymd")
    val mag = r.getAs[String]("mag")
    val listId = r.getAs[String]("list_id") //  list  --> String [1, 5, 24]
    val listSplit = listId.substring(1, listId.length - 1).split(",") //  Array[1, 5, 24] 

    val listValues = new util.ArrayList[String]() // List to store the

    for (id <- 0 until listSplit.length) {  // loop through the array of ids
      var value = 0d
      val meas1wAgo = findValueById(myDao.MyDaoMethod(name, getDateNWeeksAgo(ymd, 1), mag)(sqlContext, sc), listSplit(id))
      /* more code regarding algorithm with more measures n weeks ago*/
      value = meas1wAgo.toDouble
      listValues.add(value.toString)
    }

    Row(name, ym, dd, mag, listId, listValues)
  })

  // Define the schema for the resulting DataFrame
  val schema = StructType(Seq(
    StructField("name", StringType, nullable = false),
    StructField("meas_ym", StringType, nullable = false),
    StructField("meas_dd", StringType, nullable = false),
    StructField("mag", StringType, nullable = false),
    StructField("list_id", StringType, nullable = false),
    StructField("listValues", DataTypes.createArrayType(DataTypes.StringType), nullable = false)
  ))

  // Create a DataFrame from the RDD[Row] with the specified schema
  val DFwithValues= sqlContext.createDataFrame(myNewDF, schema)

  DFwithValues
}

MyDaoMethod
在更大的方法之外定义,并正确查询数据库并返回一个数据帧,其中包含给定名称、日期和mag的所需日期的度量。

findValueById
在外部定义,并在给定 Dataframe 和度量的 id 的情况下正确返回字符串形式的度量。

我得到的 stackTrace 如下:

diagnostics: User class threw exception: org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:415)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:405)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2353)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:393)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:392)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
    at org.apache.spark.rdd.RDD.map(RDD.scala:392)
    at /* user comment: Map Line -> /*scala:307)
    [...]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:675)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@42ebd0a3)
    - field (class: MyClass$$anonfun$6, name: sc$1, type: class org.apache.spark.SparkContext)
    - object (class Myclass$$anonfun$6, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:412)
    ... 25 more
dataframe scala apache-spark serializable
© www.soinside.com 2019 - 2024. All rights reserved.