如何获取缺少 TypeTag 的 Spark 案例类架构和 UDF?

问题描述 投票:0回答:1
import org.apache.spark.sql.Encoders
import org.apache.spark
import scala.reflect.runtime.universe._

case class StationX(  stnId: Int,
                    wbanId: Int,
                    lat: Double,
                    lon: Double)
object Inst {
  def getSchema[T <: Product : TypeTag] = Encoders.product[T].schema
  val ss = getSchema[StationX]
}

上面会产生错误:

No TypeTag available for observatory.StationX
val ss = getSchema[StationX]

如何更正此代码?

这适用于 Spark shell REPL,但不适用于我的项目。

编辑已删除并替换为完整的重现方式。

重现方法:

下载项目

测试 main 的构建,然后添加一个对象:

package observatory
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Encoders
import scala.reflect.runtime.universe._

case class CC(i: Int)
object SparkInstance {
  val spark = SparkSession
    .builder()
    .appName("Spark SQL UDF scalar example")
    .getOrCreate()

 // ** These don't work (or maybe just the def works) **

  def getSchema[T <: Product : TypeTag] = Encoders.product[T].schema

  // No TypeTag available for Double
  val random = udf(() => Math.random())

  // No TypeTag available for Int
  val plusOne = udf((x: Int) => x + 1)

  // No TypeTag available for observatory.CC
  val ss = getSchema[CC]
}

并构建

scala apache-spark schema
1个回答
0
投票

在 build.sbt 中使用这些:

  ("org.apache.spark" %% "spark-sql" % "3.3.2").cross(CrossVersion.for3Use2_13),
  ("io.github.vincenzobaz" %% "spark-scala3-encoders" % "0.2.3"),
  ("io.github.vincenzobaz" %% "spark-scala3-udf" % "0.2.3"),

以及代码方面:

package observatory
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Encoder
import scala.reflect.runtime.universe._

import scala3encoders.given

import scala3udf.{Udf => udf}

case class CC(i: Int)
object SparkInstance {
  val spark = SparkSession
    .builder()
    .appName("Spark SQL UDF scalar example")
    .getOrCreate()

  def getSchema[T : Encoder] = implicitly[Encoder[T]].schema

  val random = udf(() => Math.random())

  val plusOne = udf((x: Int) => x + 1)

  val ss = getSchema[CC]
}

您正在混合机制,scala3 文档声明您需要使用它的 udf 才能正确处理类型。类似地,getSchema 函数尝试使用 Spark 机制创建编码器,但由于使用了 scala 2 反射而失败。使用编码器上下文可以让 Spark-scala3 编码器派生工作。

© www.soinside.com 2019 - 2024. All rights reserved.