如何在被动kafka 0.8.2.2中重新运行失败的消息

问题描述 投票:2回答:2

我想重新发布x无法处理的消息。我看到手动提交代码:

val consumerWithOffsetSink = kafka.consumeWithOffsetSink(consumerProperties)
Source.fromPublisher(consumerWithOffsetSink.publisher)
.map(processMessage(_)) // your message processing
.to(consumerWithOffsetSink.offsetCommitSink) // stream back for commit
.run()

但是,如何处理processMessage()方法中的异常?我想处理异常并要求kafka重新运行该消息3次。如果3次后仍然失败,则丢弃它。

apache-kafka kafka-consumer-api akka-stream reactive
2个回答
0
投票

考虑定义一个死信队列(像DB这样的其他存储也可以)来存储这些消息,然后,我们可以尝试分析和解决它们。


0
投票

我相信你能够处理read-at-least-once,但是你不想在出现故障的情况下永久停止,而是希望在重试失败的消息3次后继续处理剩余的消息。

这可以通过apache kafka中的简单消费者轻松实现。如果这是一个选项,让我知道,我会解释。

对于反应性kafka,在processMessage中重新尝试三次后,我们将不得不查看返回“已处理”消息返回的方式。这样,它将包括(在失败的消息的偏移量)下一个偏移提交到接收器(就像它被处理),并且不会再次出现。

© www.soinside.com 2019 - 2024. All rights reserved.