防止两次创建对象

问题描述 投票:0回答:1

我有以下代码:

  type Request = EitherT[IO, Throwable, KkProducerRecordMetadata]

  def create(producer: => KkProducerCreator)
  : IO[Producer[String, String]]
  = IO {
    try {
      new KafkaProducer[String, String](properties(producer))
    } catch {
      case e: InstanceAlreadyExistsException => ???
    }

  }

  def send(producer: => IO[Producer[String, String]])
          (record: => KkProducerRecord)
  : Request = EitherT(for {
    p <- producer
    m <- IO {
      try {
        //If MaxBlockMs is not set,
        //then after 60s it will throw an exception
        val pr = new ProducerRecord[String, String](record.topic, record.key, record.value)
        val meta = p.send(pr).get()
        p.flush()

        Right(KkProducerRecordMetadata(meta.hasOffset,
          meta.hasTimestamp,
          meta.offset,
          meta.partition,
          meta.timestamp,
          meta.topic))
      } catch {
        case e: Exception => Left(e)
      }
    }
  } yield m)

  def close(producer: => IO[Producer[String, String]])
  : IO[Unit]
  = producer.map { p =>
    p.flush()
    p.close()
  }  

函数create创建了一个kafka生成器并包装到IO中,因为它可能产生副作用。

函数send将消息发送到kafka,正如你在第一个参数上看到的那样,它需要一个producer

我正在使用以下功能:

  //Creates a producer
  private val pSignIn: IO[Producer[String, String]] =
    KkProducer.create(KkProducerCreator(sys.env.get("KAFKA_SERVER").get,
      "AUTH-SIGNIN-PRODUCER",
      List(MaxBlockMsConfig(4000))))

并发送消息给kafka:

KkProducer.send(pSignIn)(KkProducerRecord(AuthTopology.SignInReqTopic,
        AuthTopology.SignInKey, a))  

正如您所看到的,每次调用send函数时,都会创建一个新的生产者实例,然后我得到一个异常:

javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=AUTH-SIGNIN-PRODUCER  

如何防止生产者以功能方式创建两次?我想创建一个生产者地图和id作为关键? 将StateT解决方案?

scala functional-programming kafka-producer-api
1个回答
0
投票

确保您创建一次生产者。将生成器实例分配给vallazy val,然后将val传递给函数。

sendclose函数的参数从名称调用更改为普通参数(按值调用)。

每次通过变量名称引用时,通过为每个send调用创建生产者来评估按名称调用参数。

如下所示声明你send

def send(producer: IO[Producer[String, String]])(record: => KkProducerRecord)

同样适用于close

例:

def foo(code: => Int) = code + code + 1

def bar(code: Int) = code + code + 1

Scala REPL

scala> :paste
// Entering paste mode (ctrl-D to finish)

def foo(code: => Int) = code + code + 1

def bar(code: Int) = code + code + 1


// Exiting paste mode, now interpreting.

foo: (code: => Int)Int
bar: (code: Int)Int

scala> foo({println("evaluated"); 1})
evaluated
evaluated
res1: Int = 3

scala> bar({println("evaluated"); 1})
evaluated
res2: Int = 3

注意:在foo的情况下,println块被评估两次不同于bar。

© www.soinside.com 2019 - 2024. All rights reserved.