读取 CQL 时间类型返回 UnsupportedOperationException,“No Encoder found for java.sql.Time”

问题描述 投票:0回答:1

我正在尝试使用 datastax 连接器将 Cassandra 表读入 Spark。我的表有 2 列使用

TIME
数据类型。我正在使用
java.sql.Time
作为数据集中的相应类型,但是 spark throws

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.sql.Time
- field (class: "java.sql.Time", name: "start")
- root class: "model.Trigger"

我试过用 Kryo 注册

Time
课程,但没有成功。我想知道我是否应该使用不同的类来与 Cassandra 的
TIME
类型交互,或者是否存在范围问题,因为我在主要方法中使用 Kryo 注册了该类,但在另一个方法中从 cassandra 检索数据(通过将 conf 生成的会话传递给方法)。

谢谢!

更新 4/12

我写了一个自定义映射器来解析时间,但是 spark 抛出以下内容

Exception in thread "main" java.lang.IllegalArgumentException: Unsupported type: java.sql.Time
    at com.datastax.spark.connector.types.TypeConverter$.forCollectionType(TypeConverter.scala:1025)
    at com.datastax.spark.connector.types.TypeConverter$.forType(TypeConverter.scala:1038)
    at com.datastax.spark.connector.types.TypeConverter$.forType(TypeConverter.scala:1057)

使用以下映射器

object ColumnMappers {
  private object LongToTimeConverter extends TypeConverter[Time] {
    override def targetTypeTag: universe.TypeTag[Time] = typeTag[Time]

    override def convertPF: PartialFunction[Any, Time] = {
      case l: Long => Time.valueOf(LocalTime.ofNanoOfDay(l))
    }
  }

  TypeConverter.registerConverter(LongToTimeConverter)

  private object TimeToLongConverter extends TypeConverter[Long] {
    override def targetTypeTag: universe.TypeTag[Long] = typeTag[Long]

    override def convertPF: PartialFunction[Any, Long] = {
      case t: Time => t.toLocalTime.toNanoOfDay
    }
  }

  TypeConverter.registerConverter(TimeToLongConverter)
}
scala apache-spark cassandra amazon-emr spark-cassandra-connector
1个回答
1
投票

问题不在于 Kryo,而在于您使用的类型 - 默认情况下,Cassandra 连接器不提供

time
类型的映射器(参见 documentation),因此您可能需要实施自己按照文档中的描述进行操作。

附言您可以尝试使用

Long
类型,因为它是 64 位数字,但我还没有在 source code 中看到显式映射。

© www.soinside.com 2019 - 2024. All rights reserved.