我使用 Spring Kafka 作为重试机制。我使用以下代码将 Kafka 侦听器配置为可重试主题:
@RetryableTopic(
include= {SystemException.class},
topicSuffixingStrategy= TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
fixedDelayTopicStrategy = FixedDelayStrategy.MULTIPLE_TOPICS,
retryTopicSuffix = "_retry",
dltTopicSuffix = "_dlq",
attempts = "3",
backoff = @Backoff( delay = 3600000L, multiplier = 1 ),
autoCreateTopics = "false")
如您所见,退避延迟为 3600000ms,相当于 1 小时。然而,重试实际上发生在异常或错误后大约 36 秒。延迟时间比1小时短很多。
我认为“max.poll.interval.ms”会触发重新平衡。所以我把“max.poll.interval.ms”设置得比延迟时间长,如下所示:
最大轮询间隔.ms = 3605000
这并没有解决问题。而且我认为我没有重新平衡问题,因为只有一个实例正在收听该主题。
我需要为 KafkaTemplate 或 ListenerConsumerFactory 设置一些属性吗?请帮忙。提前非常感谢您。
轮询间隔对此没有影响。
打开调试日志记录并查找这些:
Caused by: org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic so77360490-retry-0 is not ready for consumption, backing off for approx. 59483 millis.
(在本例中,延迟为 60000)。
然后你就会看到
DEBUG [so77360490-retry-0-0-C-1] Paused consumption from [so77360490-retry-0-0]
并且在时间到期之前它不会恢复。
有了3600000,我得到了
Partition 0 from topic so77360490-retry-0 is not ready for consumption, backing off for approx. 3599491 millis.
编辑
我让测试运行了一个小时,一个小时后,我明白了
Resumed consumption from [so77360490-retry-0-0]
Received: 1 records
Back off time: -42 ...
foo from so77360490-retry-0
(我听众的最后一首:
@KafkaListener(id = "so77360490", topics = "so77360490")
@RetryableTopic(backoff = @Backoff(delay = 3600000))
void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info(in + " from " + topic);
// throw new RuntimeException();
}