我有很多案例类/模型,我需要为其自动创建 Kafka 结构。我有下面的代码可以手动实现此目的,但我必须为所有也可以具有不同字段的案例类手动执行此操作。有没有办法创建一个通用方法来接受案例类、模式和记录并从中生成 kafka 结构?
这里 xyz 是案例类/模型,其中包含 hitDateTime、marketplaceId、hitYear 和 hitMonth 字段。
override val KAFKA_SCHEMA: Schema = xyz.KAFKA_SCHEMA
override def buildKafkaStruct(record: Any): Struct = {
val parsedRecord = record.asInstanceOf[xyz]
val structRecord = new Struct(KAFKA_SCHEMA)
.put("time", parsedRecord.time)
.put("marketplace", parsedRecord.marketplace)
structRecord
}
}
您可以将反射与宏或 Shapeless 等库一起使用,以通用方式创建结构。这并不简单,但你绝对可以做到。
另一种方法是将 Avro 与 Kafka 一起使用。使用 Avro,您可以将案例类转换为 Avro 记录,反之亦然。有一些可用的库,例如 Avro4s,它允许您自动派生案例类的 Avro 架构和 Avro 二进制格式。
import com.sksamuel.avro4s.{AvroSchema, AvroOutputStreamcase class
Xyz(hitDateTime: String, marketplaceId: String, hitYear: Int, hitMonth:
Int)
val schema = AvroSchema[Xyz]
println(schema) // You can review your Avro schema here
val xyzRecord = Xyz("2020-09-13T21:30:00", "001", 2020, 9)
val baos = new ByteArrayOutputStream()
val output = AvroOutputStream.binary[Xyz].to(baos).build(schema)
output.write(xyzRecord)
output.close()
val avroBinaryDataForXyz = baos.toByteArray() // This is the Avro data you can send to Kafka