卡夫卡流 |有人可以指导如何在使用 Kafka Stream 从 RTopic 读取数据时对外部系统进行休息调用

问题描述 投票:0回答:0

卡夫卡流 |有人可以指导如何在使用 Kafka Stream 从 RTopic 读取数据时对外部系统进行休息调用

用例:主要需求是从主题中读取数据,并通过rest API将数据发送到外部系统。也有要求以相同的顺序将消息发送到目标系统。 示例:下面两条消息应该以相同的顺序传递到目标系统(通过 rest API)。如果第一条消息未能发送到目标系统,例如 400 个错误请求,则不应将同一客户的第二条消息(如下所述)发送到目标系统,并且两条消息都应推送到死信主题以供以后处理.所有其他记录应照常挑选。

第一条消息:客户 C1 -> 地址更新 -> 地址 1 第二条消息:客户 C1 -> 地址更新 -> 地址 2

这里有两个问题:

  1. 如何处理 400 错误请求的失败场景
  2. 如何将 REST 调用与 Kafka 流分离,否则吞吐量将减少,因为 REST 调用可能需要 200 毫秒,并且还有重试机制。

使用状态存储解决问题1的方法:

  1. 阅读主题并检查状态存储中是否存在密钥。
  2. 如果存在则将当前记录移动到 DL 主题并选择下一条记录。
  3. 如果不存在,请拨打外部休息电话。如果发生错误,将记录添加到状态存储中,并将当前数据也推送到 DL 主题中。 所以,在这里我利用状态存储来解决第一个问题。

问题 2 的解决方法 -> 可以想到 KTable,但问题是 Ktable 将始终拥有最新记录,而不是同一客户的所有记录。所以所有消息都不会传递到目标系统。

任何人都可以建议如何解决问题 2 或解决问题 1 的更好方法吗?

这里有两个问题:

  1. 如何处理 400 错误请求的失败场景
  2. 如何将 REST 调用与 Kafka 流分离,否则吞吐量将减少,因为 REST 调用可能需要 200 毫秒,并且还有重试机制。

使用状态存储解决问题1的方法:

  1. 阅读主题并检查状态存储中是否存在密钥。
  2. 如果存在则将当前记录移动到 DL 主题并选择下一条记录。
  3. 如果不存在,请拨打外部休息电话。如果发生错误,将记录添加到状态存储中,并将当前数据也推送到 DL 主题中。 所以,在这里我利用状态存储来解决第一个问题。

问题 2 的解决方法 -> 可以想到 KTable,但问题是 Ktable 将始终拥有最新记录,而不是同一客户的所有记录。所以所有消息都不会传递到目标系统。

任何人都可以建议如何解决问题 2 或解决问题 1 的更好方法吗?

apache-kafka-streams confluent-platform
© www.soinside.com 2019 - 2024. All rights reserved.