Scala:如何捕获通过Kafka异步发送的所有多条消息的状态?

问题描述 投票:0回答:1

我正在使用Scala将消息发布到Kafka。我有发布到kafka的消息列表,可能是异步的。但是,最后,如果在发布至少一条消息时出现任何错误,我必须抓取并采取行动。我知道回调方法,但每个消息将工作一个。我想积累或弄清楚所有这些状态的方式,并决定是否有任何失败。像批次状态的东西。

for (message <- messageList) {
    // Create a message
    // producer is created
    // I have a separate util package, having the MyCallback class with overridden onCompletion method.
    producer.send(new ProducerRecord[String, MyMessage](config("topic"), message ), new MyCallback(config))

}
// here, I need help, to see if any of the messages sent is failed or not.
// How to capture the statuses of callback here for entire batch?


// Separate class file
class MyCallback (config: Map[String, String]) extends Callback {
    override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
     // if successful meta data, do something
     // if exception, I can log the error.

    }
}

基本上,我可以在回调类方法中检查单个消息状态。

在我的for循环结束后如何跟踪这些状态并基于此采取进一步行动?

scala callback apache-kafka kafka-producer-api
1个回答
0
投票

如果你在for循环之前移动new MyCallback(config),那么这可能会有所帮助。 然后你有一个可以收集异常的实例。 此外,回调对象将在循环后的范围内,您可以检索状态列表

我的scala有点生疏,但我认为它显示了这个想法

class MyCallback (config: Map[String, String]) extends Callback {

    private val exceptions = _ // some mutable List

    def getExceptions() { return exceptions }

    override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
     if (exception != null) {
         exceptions.add(exception)
     }
    }
}

然后

val cb = new MyCallback(config)
for (message <- messageList) {
    producer.send(new ProducerRecord[String, MyMessage](config("topic"), message ), cb)
}

// TODO: check cb.getExceptions().size > 0

或者您可以通过在循环内检查来尽早尝试失败,但这将是异步的,并且可能无法正常工作

© www.soinside.com 2019 - 2024. All rights reserved.