如何在Spark 2.4中加载自定义转换器

问题描述 投票:0回答:1

我正在尝试在Spark 2.4.0中创建自定义转换器。保存它工作正常。但是,当我尝试加载它时,我收到以下错误:

java.lang.NoSuchMethodException: TestTransformer.<init>(java.lang.String)
  at java.lang.Class.getConstructor0(Class.java:3082)
  at java.lang.Class.getConstructor(Class.java:1825)
  at org.apache.spark.ml.util.DefaultParamsReader.load(ReadWrite.scala:496)
  at org.apache.spark.ml.util.MLReadable$class.load(ReadWrite.scala:380)
  at TestTransformer$.load(<console>:40)
  ... 31 elided

这告诉我,它无法找到我的变换器的构造函数,这对我来说并没有多大意义。

MCVE:

import org.apache.spark.sql.{Dataset, DataFrame}
import org.apache.spark.sql.types.{StructType}
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable}

class TestTransformer(override val uid: String) extends Transformer with DefaultParamsWritable{

    def this() = this(Identifiable.randomUID("TestTransformer"))

    override def transform(df: Dataset[_]): DataFrame = {
        val columns = df.columns
        df.select(columns.head, columns.tail: _*)
    }

    override def transformSchema(schema: StructType): StructType = {
        schema
    }

    override def copy(extra: ParamMap): TestTransformer = defaultCopy[TestTransformer](extra)
}

object TestTransformer extends DefaultParamsReadable[TestTransformer]{

    override def load(path: String): TestTransformer = super.load(path)

}

val transformer = new TestTransformer("test")

transformer.write.overwrite().save("test_transformer")
TestTransformer.load("test_transformer")

运行此(我正在使用Jupyter笔记本)导致上述错误。我已经尝试编译并将其作为.jar文件运行,没有区别。

令我困惑的是,等效的PySpark代码运行良好:

from pyspark.sql import SparkSession, DataFrame
from pyspark.ml import Transformer
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

class TestTransformer(Transformer, DefaultParamsWritable, DefaultParamsReadable):

    def transform(self, df: DataFrame) -> DataFrame:
        return df

TestTransformer().save('test_transformer')
TestTransformer.load('test_transformer')

如何制作可以保存和加载的自定义Spark变换器?

java scala apache-spark
1个回答
3
投票

我可以在spark-shell中重现你的问题。

试图找到问题的根源我查看了DefaultParamsReadableDefaultParamsReader来源,我可以看到他们利用Java反射。

https://github.com/apache/spark/blob/v2.4.0/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala

第495-496行

val instance =
    cls.getConstructor(classOf[String]).newInstance(metadata.uid).asInstanceOf[Params]

我认为scala REPL和Java反射不是好朋友。

如果您运行此代码段(在您之后):

new TestTransformer().getClass.getConstructors

你会得到以下输出:

res1: Array[java.lang.reflect.Constructor[_]] = Array(public TestTransformer($iw), public TestTransformer($iw,java.lang.String))

是真的! TestTransformer.<init>(java.lang.String)不存在。

我找到了2个解决方法,

  1. 用sbt编译你的代码并创建一个jar,然后用:require包含在spark-shell中,为我工作(你提到你试过一个罐子,我不知道怎么回事)
  2. 使用:paste -raw将代码粘贴在spark-shell中,也可以正常工作。我想-raw会阻止REPL对您的课程进行恶作剧。见:https://docs.scala-lang.org/overviews/repl/overview.html

我不知道你怎么能适应Jupyter,但我希望这些信息对你有用。

注意:我实际上在spark 2.4.1中使用了spark-shell

© www.soinside.com 2019 - 2024. All rights reserved.