我正在寻找一种方法来重构这段代码并使其更清晰。我确信有办法,但一直没能弄清楚。
我正在使用 avro4s,我需要为 Kafka 启用序列化,并且我有两个标准:
我想出的解决方案是创建两个将父特征作为输入的序列化器,在每个序列化器中,我首先对输入进行模式匹配,然后选择二进制或 json 序列化。
在下面的代码中,关键问题是 AvroOutputStream.binary[T] 或 AvroOutputStream.json[T]。我找不到减少代码重复的方法。
trait KafkaMessage
case class UserDeleted(name: String, age: Int) extends KafkaMessage
case class UserCreated(name: String, age: Int) extends KafkaMessage
class KafkaMessageBinarySerializer extends Serializer[KafkaMessage] with Serializable {
override def serialize(topic: String, data: KafkaMessage): Array[Byte] = {
val byteStream = new ByteArrayOutputStream()
// TODO: optimise this
val output = data match {
case e: UserCreated =>
val output = AvroOutputStream.binary[UserCreated].to(byteStream).build()
output.write(e)
output
case e: UserDeleted =>
val output = AvroOutputStream.binary[UserDeleted].to(byteStream).build()
output.write(e)
output
}
output.close()
byteStream.toByteArray
}
}
class KafkaMessageJsonSerializer extends Serializer[KafkaMessage] with Serializable {
override def serialize(topic: String, data: KafkaMessage): Array[Byte] = {
val byteStream = new ByteArrayOutputStream()
// TODO: optimise this
val output = data match {
case e: UserCreated =>
val output = AvroOutputStream.json[UserCreated].to(byteStream).build()
output.write(e)
output
case e: UserDeleted =>
val output = AvroOutputStream.json[UserDeleted].to(byteStream).build()
output.write(e)
output
}
output.close()
byteStream.toByteArray
}
}
问题实际上并不在于序列化或 Kafka,而在于类型和模式匹配。
我的解决方案迫切需要改进,但我的摆弄并没有产生任何好的结果😔我很想得到一些关于如何改进这个的建议。
你的意思是这样的吗:
trait Dispatched[T] {
type Out <: T
val value: Out
val builder: AvroOutputStreamBuilder[Out]
}
object Dispatched {
def apply[T, T1 <: T](_value: T1, _builder: AvroOutputStreamBuilder[T1]): Dispatched[T] =
new Dispatched[T] {
type Out = T1
val value = _value
val builder = _builder
}
}
trait CommonLogic[T] extends Serializer[T] with Serializable {
protected val dispatch: T => Dispatched[T]
override def serialize(topic: String, data: T): Array[Byte] = {
val byteStream = new ByteArrayOutputStream()
// could be made prettier with some custom extractor, but whatever
val pair = dispatch(value)
scala.util.Using(pair.builder.to(byteStream).build()) { output =>
output.write(pair.value)
}
byteStream.toByteArray
}
}
class KafkaMessageBinarySerializer extends CommonLogic[KafkaMessage] {
override protected val dispatch = {
case e: UserCreated => Dispatched(e, AvroOutputStream.binary[UserCreated])
case e: UserDeleted => Dispatched(e, AvroOutputStream.binary[UserDeleted])
}
}
class KafkaMessageJsonSerializer extends CommonLogic[KafkaMessage] {
override protected val dispatch = {
case e: UserCreated => Dispatched(e, AvroOutputStream.json[UserCreated])
case e: UserDeleted => Dispatched(e, AvroOutputStream.json[UserDeleted])
}
}
为了进一步减少它,你可以查看内部,看看这些 .json 和 .binary 方法做了什么:
// binary
new AvroOutputStreamBuilder[T](schema, encoder, AvroFormat.Binary)
// json
new AvroOutputStreamBuilder[T](schema, encoder, AvroFormat.Json)
它们基本上传递 2 个隐式参数和硬编码 1 个标志。所以你可以这样做:
trait Dispatched[T] {
type Out <: T
val value: Out
val schema: Schema[Out]
val encoder: Encoder[Out]
}
object Dispatched {
def apply[T, T1 <: T: SchemaFor : Encoder](_value: T1): Dispatched[T] =
new Dispatched[T] {
type Out = T1
val value = _value
val schema = implicitly[SchemaFor[T]]
val encoder = implicitly[Encoder[T]]
}
}
abstract class CommonLogic[T](format: AvroFormat) extends Serializer[T] with Serializable {
protected val dispatch: T => Dispatched[T]
override def serialize(topic: String, data: T): Array[Byte] = {
val byteStream = new ByteArrayOutputStream()
val triple = dispatch(data)
val builder = new AvroOutputStreamBuilder(triple.schema, triple.encoder, format)
scala.util.Using(builder.to(byteStream).build()) { output =>
output.write(triple.value)
}
byteStream.toByteArray
}
}
class KafkaMessageSerializer(format: AvroFormat) extends CommonLogic[KafkaMessage](format) {
// could be derived with macros or type classes, but let's not go over the top
override protected val dispatch = {
case e: UserCreated => Dispatched(e)
case e: UserDeleted => Dispatched(e)
}
}
class KafkaMessageBinarySerializer extends KafkaMessageSerializer(AvroFormat.Binary)
class KafkaMessageJsonSerializer extends KafkaMessageSerializer(AvroFormat.Json