Avro编写java.sql.Timestamp转换错误

问题描述 投票:1回答:1

我需要为Kafka分区写一个时间戳,然后从中读取它。我为此定义了一个Avro架构:

{ "namespace":"sample",
  "type":"record",
  "name":"TestData",
  "fields":[
    {"name": "update_database_time", "type": "long", "logicalType": "timestamp-millis"}
  ]
}

但是,我在producer.send行中收到转换错误:

java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long

我怎样才能解决这个问题?

以下是向Kafka写入时间戳的代码:

    val tmstpOffset = testDataDF
      .select("update_database_time")
      .orderBy(desc("update_database_time"))
      .head()
      .getTimestamp(0)

    val avroRecord = new GenericData.Record(parseAvroSchemaFromFile("/avro-offset-schema.json"))
    avroRecord.put("update_database_time", tmstpOffset)

    val producer = new KafkaProducer[String, GenericRecord](kafkaParams().asJava)
    val data = new ProducerRecord[String, GenericRecord]("app_state_test7", avroRecord)
    producer.send(data)
apache-spark apache-kafka spark-avro
1个回答
1
投票

Avro不直接支持时间戳的时间,但逻辑上很长。因此,您可以将其转换为long并使用如下所示。 unix_timestamp()函数用于转换,但如果您有特定的日期格式,请使用unix_timestamp(col,dataformat)重载函数。

import org.apache.spark.sql.functions._
val tmstpOffset = testDataDF
      .select((unix_timestamp("update_database_time")*1000).as("update_database_time"))
      .orderBy(desc("update_database_time"))
      .head()
      .getTimestamp(0)
© www.soinside.com 2019 - 2024. All rights reserved.