Spark数据集map(identity)在Scala上不可序列化。

问题描述 投票:-1回答:1

我有一个自带一堆列的数据框架,但我只需要其中的几列。我创建了一个case类来对所述数据框架进行建模,希望未声明的列会被丢弃,但这并没有发生。过了一会儿,我发现了这个。https:/issues.apache.orgjirabrowseSPARK-19477。

很显然,以前是这样的,但在Spark 2+中已经不是了,因为 Dataset.as[T] 是懒惰的。一位用户Christophe Préaud说,一个变通的办法是将数据集与 身份如: ds.map(identity).

这对我来说在本地是可行的。

case class Customer(customerId: String,
                    email: String)

dfCustomer.map(identity)

但是在齐柏林飞船集群上运行该功能时,返回的是 Task not serializable,这里是完整的异常。

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:311)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
  at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:644)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:603)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:612)
  ... 67 elided
Caused by: java.io.NotSerializableException: com.propzmedia.mcloud.spark.context.Context
Serialization stack:
    - object not serializable (class: com.propzmedia.mcloud.spark.context.Context, value: com.propzmedia.mcloud.spark.context.Context@217cf66)
    - field (class: $iw, name: context, type: class com.propzmedia.mcloud.spark.context.Context)
    - object (class $iw, $iw@2edbe0e2)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@7e562fd)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@5120325e)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@a6b8aaf)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@d4f4d73)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@421db8eb)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@1a5b5058)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@7ae4d738)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@349d00d2)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@3f848718)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@41c748a5)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@6e0582d5)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@6ea14094)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@d75d38)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@366f9a2a)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@7a1cac8a)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@46b9801a)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@2a3b9ad5)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@5e75f3ec)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@fd19d93)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@7e4b8238)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@428ac521)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@67a3f236)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@2f36aae7)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@4dc2f0cb)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@63dcb5c)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@4fd575eb)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@5e835e49)
    - field (class: $line116227906798.$read, name: $iw, type: class $iw)
    - object (class $line116227906798.$read, $line116227906798.$read@33de578)
    - field (class: $iw, name: $line116227906798$read, type: class $line116227906798.$read)
    - object (class $iw, $iw@52f8a56e)
    - field (class: $iw, name: $outer, type: class $iw)
    - object (class $iw, $iw@a4e44c3)
    - element of array (index: 3)
    - array (class [Ljava.lang.Object;, size 11)
    - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
  ... 98 more

我以为scala中所有的case类都是可序列化的 我不知道这是不是spark数据集的问题 而不是scala本身的问题。或者是map identity函数本身是不可序列化的?有人知道吗?

scala apache-spark apache-spark-sql apache-spark-dataset
1个回答
1
投票

从你所提供的日志来看,你似乎已经创建了一个对象的 com.propzmedia.mcloud.spark.context.Context 类,并已使用passed到Spark执行器代码。

© www.soinside.com 2019 - 2024. All rights reserved.