以下是我面临的问题的最小示例。我有一个要修改的数组,因为它有大约一百万个元素。除最后一条语句外,以下代码有效。
import spark.implicits._
case class Frame(x: Double, var y: Array[Double]) {
def total(): Double = {
return y.sum
}
def modifier(): Unit = {
for (i <- 0 until y.length) {
y(i) += 10
}
return
}
}
val df = Seq(
(1.0, Array(0, 2, 1)),
(8.0, Array(1, 2, 3)),
(9.0, Array(11, 21, 23))
).toDF("x", "y")
val ds = df.as[Frame]
ds.show
ds.map(_.total()).show // works
ds.map(_.modifier()).show // does not work
错误如下:
scala> ds.map(_.modifier()).show
<console>:50: error: Unable to find encoder for type Unit. An implicit Encoder[Unit] is needed to store Unit instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
ds.map(_.modifier()).show
我看不到问题的根源。对于修复该错误的帮助,我将不胜感激。
实际上,这与'var'或'val'无关,它与可变数据结构有关。问题是modifier
返回Unit
(例如什么都没有),因此您无法映射该结果。您可以使用:
case class Frame(x: Double, var y: Array[Double]) {
def total(): Double = {
return y.sum
}
def modifier(): Frame = {
for (i <- 0 until y.length) {
y(i) += 10
}
return this
}
}
但是我认为我没有多大意义,您应该避免状态易变。此外,我会在case中保持简单的案例类(即无逻辑),仅将它们用作数据容器。如果到那时必须增加每个元素,则也可以这样:
case class Frame(x: Double, val y: Array[Double])
ds.map(fr => fr.copy(y = fr.y.map(_+10.0))).show