我正在尝试使用 datastax 连接器将 Cassandra 表读入 Spark。我的表有 2 列使用
TIME
数据类型。我正在使用 java.sql.Time
作为数据集中的相应类型,但是 spark throws
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.sql.Time
- field (class: "java.sql.Time", name: "start")
- root class: "model.Trigger"
我试过用 Kryo 注册
Time
课程,但没有成功。我想知道我是否应该使用不同的类来与 Cassandra 的TIME
类型交互,或者是否存在范围问题,因为我在主要方法中使用 Kryo 注册了该类,但在另一个方法中从 cassandra 检索数据(通过将 conf 生成的会话传递给方法)。
谢谢!
更新 4/12
我写了一个自定义映射器来解析时间,但是 spark 抛出以下内容
Exception in thread "main" java.lang.IllegalArgumentException: Unsupported type: java.sql.Time
at com.datastax.spark.connector.types.TypeConverter$.forCollectionType(TypeConverter.scala:1025)
at com.datastax.spark.connector.types.TypeConverter$.forType(TypeConverter.scala:1038)
at com.datastax.spark.connector.types.TypeConverter$.forType(TypeConverter.scala:1057)
使用以下映射器
object ColumnMappers {
private object LongToTimeConverter extends TypeConverter[Time] {
override def targetTypeTag: universe.TypeTag[Time] = typeTag[Time]
override def convertPF: PartialFunction[Any, Time] = {
case l: Long => Time.valueOf(LocalTime.ofNanoOfDay(l))
}
}
TypeConverter.registerConverter(LongToTimeConverter)
private object TimeToLongConverter extends TypeConverter[Long] {
override def targetTypeTag: universe.TypeTag[Long] = typeTag[Long]
override def convertPF: PartialFunction[Any, Long] = {
case t: Time => t.toLocalTime.toNanoOfDay
}
}
TypeConverter.registerConverter(TimeToLongConverter)
}
问题不在于 Kryo,而在于您使用的类型 - 默认情况下,Cassandra 连接器不提供
time
类型的映射器(参见 documentation),因此您可能需要实施自己按照文档中的描述进行操作。
附言您可以尝试使用
Long
类型,因为它是 64 位数字,但我还没有在 source code 中看到显式映射。