如果使用消息的 kafka 流应用程序有一些异常,如何从 apache Kafka 重新读取消息

问题描述 投票:0回答:1

我正在使用 kafka 流读取来自 kafka 的消息进行实时处理。 然后,我将此消息发送到其他 rest api(最终服务),如果在计算最终服务中的消息时出现错误,它将向主服务抛出错误。 在主要服务中,现在,我想在特定时间间隔后或根据场景立即重新阅读这些消息。

场景-

  • 如果最终服务不工作,连接超时异常。在这种情况下,应在特定时间间隔后重试消息。
  • 如果出现技术异常,则应立即重新处理消息。

java apache-kafka apache-kafka-streams
1个回答
0
投票

如果您不提交消耗的记录,您只需重新启动应用程序。但是,要立即重试对最终 API 的请求,您必须为来自 Kafka 的每条消费记录编写代码来执行此操作; Kafka Streams 只会尝试将记录传递给您的 API 客户端一次。

否则,您需要向后寻找消费者群体,如评论中所述,或使用

kafka-consumer-groups --reset-offsets
CLI 每个主题可能是您的拓扑的一部分。

旁注:Kafka Streams 不应用于对外部端点进行同步请求/回复调用。为此使用具有更严格偏移处理的 Kafka Connect 或 Consumer API

© www.soinside.com 2019 - 2024. All rights reserved.