我有一个自带一堆列的数据框架,但我只需要其中的几列。我创建了一个case类来对所述数据框架进行建模,希望未声明的列会被丢弃,但这并没有发生。过了一会儿,我发现了这个。https:/issues.apache.orgjirabrowseSPARK-19477。
很显然,以前是这样的,但在Spark 2+中已经不是了,因为 Dataset.as[T]
是懒惰的。一位用户Christophe Préaud说,一个变通的办法是将数据集与 身份如: ds.map(identity)
.
这对我来说在本地是可行的。
case class Customer(customerId: String,
email: String)
dfCustomer.map(identity)
但是在齐柏林飞船集群上运行该功能时,返回的是 Task not serializable
,这里是完整的异常。
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:311)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
at org.apache.spark.sql.Dataset.show(Dataset.scala:644)
at org.apache.spark.sql.Dataset.show(Dataset.scala:603)
at org.apache.spark.sql.Dataset.show(Dataset.scala:612)
... 67 elided
Caused by: java.io.NotSerializableException: com.propzmedia.mcloud.spark.context.Context
Serialization stack:
- object not serializable (class: com.propzmedia.mcloud.spark.context.Context, value: com.propzmedia.mcloud.spark.context.Context@217cf66)
- field (class: $iw, name: context, type: class com.propzmedia.mcloud.spark.context.Context)
- object (class $iw, $iw@2edbe0e2)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7e562fd)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5120325e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@a6b8aaf)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@d4f4d73)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@421db8eb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1a5b5058)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7ae4d738)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@349d00d2)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3f848718)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@41c748a5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6e0582d5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6ea14094)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@d75d38)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@366f9a2a)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7a1cac8a)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@46b9801a)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2a3b9ad5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5e75f3ec)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@fd19d93)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7e4b8238)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@428ac521)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@67a3f236)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2f36aae7)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4dc2f0cb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@63dcb5c)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4fd575eb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5e835e49)
- field (class: $line116227906798.$read, name: $iw, type: class $iw)
- object (class $line116227906798.$read, $line116227906798.$read@33de578)
- field (class: $iw, name: $line116227906798$read, type: class $line116227906798.$read)
- object (class $iw, $iw@52f8a56e)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@a4e44c3)
- element of array (index: 3)
- array (class [Ljava.lang.Object;, size 11)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 98 more
我以为scala中所有的case类都是可序列化的 我不知道这是不是spark数据集的问题 而不是scala本身的问题。或者是map identity函数本身是不可序列化的?有人知道吗?
从你所提供的日志来看,你似乎已经创建了一个对象的 com.propzmedia.mcloud.spark.context.Context
类,并已使用passed到Spark执行器代码。