我有一个主题,其中包含两种类型的消息:一种是创建数据,一种是更新。我遇到的情况是某些更新事件在创建事件之前到达。我正在尝试实现 Spring Kafka 重试以再次重试更新消息。
我看到了几种方法:
阻止重试:我已更新
DefaultErrorHandler
并提供了FixedBackOff
。不过,我认为如果更新和创建消息位于同一分区,那么更新消息最终将转到 ConsumerRecordRecoverer
,到目前为止,我只是记录它。
非阻塞重试:我正在尝试使用
@RetryableTopic
注释。但是,我没有找到一种方法来明确提供重试主题和消费者名称。
对于选项#2,是否可以以编程方式提供主题名称和消费者名称,而不是依赖框架?
很大程度上取决于您的系统的要求。例如,尽快处理更新(包括与被阻止更新无关的更新)有多重要?如果不是那么重要,那么一旦遇到无法处理的更新,您可以完全暂停处理。
但就我个人而言,我只是将失败的消息写入不同的重试更新主题。这意味着您可以继续处理原始流。如果您遇到更多更新,您可能还需要推迟对同一对象的其他更新,具体取决于它在您的系统中的工作方式。
然后可以更宽松地处理重试更新流。即使这样,您也可以拥有不同的重试更新主题(retry-update-1、retry-update-2),并在尝试之前有不同的等待间隔。如果您绝对必须保持所有内容的流式传输并且各个更新根本不应该相互冲突,那么这非常有用。
例如
retry-update-1: retry after 1 second
retry-update-2: retry after 5 seconds
retry-update-3: retry after 1 minute
retry-update-4: retry after 1 hour
我不知道 Spring Kafka API 在这里支持多少,但是使用 Kafka 库这很容易做到,这也适用于 Spring Boot 应用程序。
您有一个
org.apache.kafka.streams.processor.ProcessorContext
课程可以让您安排这些事情。
context.schedule(scheduledInterval, PunctuationType.WALL_CLOCK_TIME, timestamp -> {
...
}