Scala,如何简化或重用有副作用的模式匹配逻辑?

问题描述 投票:0回答:1

我正在寻找一种方法来重构这段代码并使其更清晰。我确信有办法,但一直没能弄清楚。

我正在使用 avro4s,我需要为 Kafka 启用序列化,并且我有两个标准:

  • 为不同的案例类(例如 UserCreated 和 UserDeleted)提供序列化器
  • 对于每个案例类,提供二进制和 json 序列化

我想出的解决方案是创建两个将父特征作为输入的序列化器,在每个序列化器中,我首先对输入进行模式匹配,然后选择二进制或 json 序列化。

在下面的代码中,关键问题是 AvroOutputStream.binary[T] 或 AvroOutputStream.json[T]。我找不到减少代码重复的方法。

trait KafkaMessage

case class UserDeleted(name: String, age: Int) extends KafkaMessage

case class UserCreated(name: String, age: Int) extends KafkaMessage

class KafkaMessageBinarySerializer extends Serializer[KafkaMessage] with Serializable {

  override def serialize(topic: String, data: KafkaMessage): Array[Byte] = {
    val byteStream = new ByteArrayOutputStream()

    // TODO: optimise this
    val output = data match {
      case e: UserCreated =>
        val output = AvroOutputStream.binary[UserCreated].to(byteStream).build()
        output.write(e)
        output

      case e: UserDeleted =>
        val output = AvroOutputStream.binary[UserDeleted].to(byteStream).build()
        output.write(e)
        output
    }

    output.close()
    byteStream.toByteArray
  }
}

class KafkaMessageJsonSerializer extends Serializer[KafkaMessage] with Serializable {

  override def serialize(topic: String, data: KafkaMessage): Array[Byte] = {
    val byteStream = new ByteArrayOutputStream()

    // TODO: optimise this
    val output = data match {
      case e: UserCreated =>
        val output = AvroOutputStream.json[UserCreated].to(byteStream).build()
        output.write(e)
        output

      case e: UserDeleted =>
        val output = AvroOutputStream.json[UserDeleted].to(byteStream).build()
        output.write(e)
        output
    }

    output.close()
    byteStream.toByteArray
  }
}

问题实际上并不在于序列化或 Kafka,而在于类型和模式匹配。

我的解决方案迫切需要改进,但我的摆弄并没有产生任何好的结果😔我很想得到一些关于如何改进这个的建议。

scala serialization apache-kafka pattern-matching avro4s
1个回答
0
投票

你的意思是这样的吗:

trait Dispatched[T] {
  type Out <: T
  val value: Out
  val builder: AvroOutputStreamBuilder[Out]
}
object Dispatched {
  def apply[T, T1 <: T](_value: T1, _builder: AvroOutputStreamBuilder[T1]): Dispatched[T] =
    new Dispatched[T] {
      type Out = T1
      val value = _value
      val builder = _builder
    }
}

trait CommonLogic[T] extends Serializer[T] with Serializable {

  protected val dispatch: T => Dispatched[T]

  override def serialize(topic: String, data: T): Array[Byte] = {
    val byteStream = new ByteArrayOutputStream()
    // could be made prettier with some custom extractor, but whatever
    val pair = dispatch(value) 

    scala.util.Using(pair.builder.to(byteStream).build()) { output =>
      output.write(pair.value)
    }

    byteStream.toByteArray
  }
}

class KafkaMessageBinarySerializer extends CommonLogic[KafkaMessage] {

  override protected val dispatch = {
    case e: UserCreated => Dispatched(e, AvroOutputStream.binary[UserCreated])
    case e: UserDeleted => Dispatched(e, AvroOutputStream.binary[UserDeleted])
  }
}

class KafkaMessageJsonSerializer extends CommonLogic[KafkaMessage] {

  override protected val dispatch = {
    case e: UserCreated => Dispatched(e, AvroOutputStream.json[UserCreated])
    case e: UserDeleted => Dispatched(e, AvroOutputStream.json[UserDeleted])
  }
}

为了进一步减少它,你可以查看内部,看看这些 .json 和 .binary 方法做了什么:

// binary
new AvroOutputStreamBuilder[T](schema, encoder, AvroFormat.Binary)
// json
new AvroOutputStreamBuilder[T](schema, encoder, AvroFormat.Json)

它们基本上传递 2 个隐式参数和硬编码 1 个标志。所以你可以这样做:

trait Dispatched[T] {
  type Out <: T
  val value: Out
  val schema: Schema[Out]
  val encoder: Encoder[Out]
}
object Dispatched {
  def apply[T, T1 <: T: SchemaFor : Encoder](_value: T1): Dispatched[T] =
    new Dispatched[T] {
      type Out = T1
      val value = _value
      val schema = implicitly[SchemaFor[T]]
      val encoder = implicitly[Encoder[T]]
    }
}

abstract class CommonLogic[T](format: AvroFormat) extends Serializer[T] with Serializable {

  protected val dispatch: T => Dispatched[T]

  override def serialize(topic: String, data: T): Array[Byte] = {
    val byteStream = new ByteArrayOutputStream()
    val triple = dispatch(data)
    val builder = new AvroOutputStreamBuilder(triple.schema, triple.encoder, format)

    scala.util.Using(builder.to(byteStream).build()) { output =>
      output.write(triple.value)
    }

    byteStream.toByteArray
  }
}

class KafkaMessageSerializer(format: AvroFormat) extends CommonLogic[KafkaMessage](format) {

  // could be derived with macros or type classes, but let's not go over the top
  override protected val dispatch = {
    case e: UserCreated => Dispatched(e)
    case e: UserDeleted => Dispatched(e)
  }
}

class KafkaMessageBinarySerializer extends KafkaMessageSerializer(AvroFormat.Binary)

class KafkaMessageJsonSerializer extends KafkaMessageSerializer(AvroFormat.Json
© www.soinside.com 2019 - 2024. All rights reserved.