我正在使用Scala将消息发布到Kafka。我有发布到kafka的消息列表,可能是异步的。但是,最后,如果在发布至少一条消息时出现任何错误,我必须抓取并采取行动。我知道回调方法,但每个消息将工作一个。我想积累或弄清楚所有这些状态的方式,并决定是否有任何失败。像批次状态的东西。
for (message <- messageList) {
// Create a message
// producer is created
// I have a separate util package, having the MyCallback class with overridden onCompletion method.
producer.send(new ProducerRecord[String, MyMessage](config("topic"), message ), new MyCallback(config))
}
// here, I need help, to see if any of the messages sent is failed or not.
// How to capture the statuses of callback here for entire batch?
// Separate class file
class MyCallback (config: Map[String, String]) extends Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
// if successful meta data, do something
// if exception, I can log the error.
}
}
基本上,我可以在回调类方法中检查单个消息状态。
在我的for循环结束后如何跟踪这些状态并基于此采取进一步行动?
如果你在for循环之前移动new MyCallback(config)
,那么这可能会有所帮助。
然后你有一个可以收集异常的实例。
此外,回调对象将在循环后的范围内,您可以检索状态列表
我的scala有点生疏,但我认为它显示了这个想法
class MyCallback (config: Map[String, String]) extends Callback {
private val exceptions = _ // some mutable List
def getExceptions() { return exceptions }
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception != null) {
exceptions.add(exception)
}
}
}
然后
val cb = new MyCallback(config)
for (message <- messageList) {
producer.send(new ProducerRecord[String, MyMessage](config("topic"), message ), cb)
}
// TODO: check cb.getExceptions().size > 0
或者您可以通过在循环内检查来尽早尝试失败,但这将是异步的,并且可能无法正常工作