使用自己的方法修改Spark中数据集的var类型

问题描述 投票:0回答:1

以下是我面临的问题的最小示例。我有一个要修改的数组,因为它有大约一百万个元素。除最后一条语句外,以下代码有效。

 import spark.implicits._

 case class Frame(x: Double, var y: Array[Double]) {
    def total(): Double = {
        return y.sum
    }
    def modifier(): Unit = {
        for (i <- 0 until y.length) {
            y(i) += 10
        }
        return
    }
 }

 val df = Seq(
               (1.0, Array(0, 2, 1)),
               (8.0, Array(1, 2, 3)),
               (9.0, Array(11, 21, 23))
             ).toDF("x", "y")

 val ds = df.as[Frame]
 ds.show

 ds.map(_.total()).show     // works
 ds.map(_.modifier()).show  // does not work

错误如下:

scala> ds.map(_.modifier()).show
<console>:50: error: Unable to find encoder for type Unit. An implicit Encoder[Unit] is needed to store Unit instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
       ds.map(_.modifier()).show

我看不到问题的根源。对于修复该错误的帮助,我将不胜感激。

scala apache-spark apache-spark-dataset
1个回答
1
投票

实际上,这与'var'或'val'无关,它与可变数据结构有关。问题是modifier返回Unit(例如什么都没有),因此您无法映射该结果。您可以使用:

运行它
case class Frame(x: Double, var y: Array[Double]) {
def total(): Double = {
  return y.sum
}
def modifier(): Frame = {
  for (i <- 0 until y.length) {
    y(i) += 10
  }
  return this
}

}

但是我认为我没有多大意义,您应该避免状态易变。此外,我会在case中保持简单的案例类(即无逻辑),仅将它们用作数据容器。如果到那时必须增加每个元素,则也可以这样:

case class Frame(x: Double, val y: Array[Double])

ds.map(fr => fr.copy(y = fr.y.map(_+10.0))).show    
© www.soinside.com 2019 - 2024. All rights reserved.