基于案例类和模式创建 Kafka Struct

问题描述 投票:0回答:1

我有很多案例类/模型,我需要为其自动创建 Kafka 结构。我有下面的代码可以手动实现此目的,但我必须为所有也可以具有不同字段的案例类手动执行此操作。有没有办法创建一个通用方法来接受案例类、模式和记录并从中生成 kafka 结构?

这里 xyz 是案例类/模型,其中包含 hitDateTime、marketplaceId、hitYear 和 hitMonth 字段。

  override val KAFKA_SCHEMA: Schema = xyz.KAFKA_SCHEMA
  override def buildKafkaStruct(record: Any): Struct = {
    val parsedRecord = record.asInstanceOf[xyz]
    val structRecord = new Struct(KAFKA_SCHEMA)
      .put("time", parsedRecord.time)
      .put("marketplace", parsedRecord.marketplace)

    structRecord
  }
}
scala apache-kafka apache-kafka-connect case-class
1个回答
-1
投票

您可以将反射与宏或 Shapeless 等库一起使用,以通用方式创建结构。这并不简单,但你绝对可以做到。

另一种方法是将 Avro 与 Kafka 一起使用。使用 Avro,您可以将案例类转换为 Avro 记录,反之亦然。有一些可用的库,例如 Avro4s,它允许您自动派生案例类的 Avro 架构和 Avro 二进制格式。

import com.sksamuel.avro4s.{AvroSchema, AvroOutputStreamcase class 
Xyz(hitDateTime: String, marketplaceId: String, hitYear: Int, hitMonth: 
Int)

val schema = AvroSchema[Xyz]
println(schema)  // You can review your Avro schema here

val xyzRecord = Xyz("2020-09-13T21:30:00", "001", 2020, 9)

val baos = new ByteArrayOutputStream()
val output = AvroOutputStream.binary[Xyz].to(baos).build(schema)
output.write(xyzRecord)
output.close()

val avroBinaryDataForXyz = baos.toByteArray() // This is the Avro data you can send to Kafka
© www.soinside.com 2019 - 2024. All rights reserved.