import org.apache.spark.sql.Encoders
import org.apache.spark
import scala.reflect.runtime.universe._
case class StationX( stnId: Int,
wbanId: Int,
lat: Double,
lon: Double)
object Inst {
def getSchema[T <: Product : TypeTag] = Encoders.product[T].schema
val ss = getSchema[StationX]
}
上面会产生错误:
No TypeTag available for observatory.StationX
在 val ss = getSchema[StationX]
如何更正此代码?
这适用于 Spark shell REPL,但不适用于我的项目。
编辑已删除并替换为完整的重现方式。
重现方法:
测试 main 的构建,然后添加一个对象:
package observatory
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Encoders
import scala.reflect.runtime.universe._
case class CC(i: Int)
object SparkInstance {
val spark = SparkSession
.builder()
.appName("Spark SQL UDF scalar example")
.getOrCreate()
// ** These don't work (or maybe just the def works) **
def getSchema[T <: Product : TypeTag] = Encoders.product[T].schema
// No TypeTag available for Double
val random = udf(() => Math.random())
// No TypeTag available for Int
val plusOne = udf((x: Int) => x + 1)
// No TypeTag available for observatory.CC
val ss = getSchema[CC]
}
并构建
在 build.sbt 中使用这些:
("org.apache.spark" %% "spark-sql" % "3.3.2").cross(CrossVersion.for3Use2_13),
("io.github.vincenzobaz" %% "spark-scala3-encoders" % "0.2.3"),
("io.github.vincenzobaz" %% "spark-scala3-udf" % "0.2.3"),
以及代码方面:
package observatory
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Encoder
import scala.reflect.runtime.universe._
import scala3encoders.given
import scala3udf.{Udf => udf}
case class CC(i: Int)
object SparkInstance {
val spark = SparkSession
.builder()
.appName("Spark SQL UDF scalar example")
.getOrCreate()
def getSchema[T : Encoder] = implicitly[Encoder[T]].schema
val random = udf(() => Math.random())
val plusOne = udf((x: Int) => x + 1)
val ss = getSchema[CC]
}
您正在混合机制,scala3 文档声明您需要使用它的 udf 才能正确处理类型。类似地,getSchema 函数尝试使用 Spark 机制创建编码器,但由于使用了 scala 2 反射而失败。使用编码器上下文可以让 Spark-scala3 编码器派生工作。