我正在使用 kafka 流读取来自 kafka 的消息进行实时处理。 然后,我将此消息发送到其他 rest api(最终服务),如果在计算最终服务中的消息时出现错误,它将向主服务抛出错误。 在主要服务中,现在,我想在特定时间间隔后或根据场景立即重新阅读这些消息。
场景-
如果您不提交消耗的记录,您只需重新启动应用程序。但是,要立即重试对最终 API 的请求,您必须为来自 Kafka 的每条消费记录编写代码来执行此操作; Kafka Streams 只会尝试将记录传递给您的 API 客户端一次。
否则,您需要向后寻找消费者群体,如评论中所述,或使用
kafka-consumer-groups --reset-offsets
CLI 每个主题可能是您的拓扑的一部分。
旁注:Kafka Streams 不应用于对外部端点进行同步请求/回复调用。为此使用具有更严格偏移处理的 Kafka Connect 或 Consumer API