我们正在从 Spring Boot 2 / spring-kafka 2.8.4 升级到 Spring Boot 3 / spring-kafka 3.1.2 并且必须从
SeekToCurrentErrorHandler
过渡到 CommonErrorHandler
。我们最初的尝试是将创建 SeekToCurrentErrorHandler
的方法替换为创建 DefaultErrorHandler
的方法,该方法调用采用 ConsumerRecordRecoverer
和 BackOff
的构造函数。 ConsumerRecordRecoverer
的 BiFunction 永远不会被调用。
这次升级我们做错了什么?
我们尝试使用此设置创建一个 DefaultErrorHandler bean,但结果相同。
这是我们在带有
@Configuration
注释的 @EnableKafka
类中的设置:
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> consumerFactory,
KafkaTemplate<Object, Object> kafkaTemplate) {
final var factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, consumerFactory);
factory.setCommonErrorHandler(
feedsErrorHandler(kafkaTemplate, retryProperties.getMainTopic()));
return factory;
}
private DefaultErrorHandler feedsErrorHandler(
KafkaTemplate<? extends Object, ? extends Object> kafkaTemplate, RetrySettings retrySettings) {
final var backOffWithMaxRetries = createBackOffWithMaxRetries(retrySettings);
return createCommonErrorHandler(kafkaTemplate, backOffWithMaxRetries);
}
private ExponentialBackOffWithMaxRetries createBackOffWithMaxRetries(
RetrySettings retrySettings) {
final var backOffWithMaxRetries =
new ExponentialBackOffWithMaxRetries(retrySettings.getMaxRetries());
backOffWithMaxRetries.setInitialInterval(retrySettings.getInitialInterval());
backOffWithMaxRetries.setMultiplier(retrySettings.getMultiplier());
backOffWithMaxRetries.setMaxInterval(retrySettings.getMaxInterval());
return backOffWithMaxRetries;
}
private DefaultErrorHandler createCommonErrorHandler(
KafkaTemplate<? extends Object, ? extends Object> kafkaTemplate,
ExponentialBackOffWithMaxRetries backOffWithMaxRetries) {
final var defaultErrorHandler =
new DefaultErrorHandler(
deadLetterPublishingRecoverer(kafkaTemplate), backOffWithMaxRetries);
// Setup Retryable Exceptions here. And add them to the classifications map.
// All other exceptions default to non-retryable.
final var classified = new HashMap<Class<? extends Throwable>, Boolean>();
retryProperties.getRetryableExceptions().forEach(aClass -> classified.put(aClass, true));
defaultErrorHandler.setClassifications(classified, false);
defaultErrorHandler.setCommitRecovered(true);
return defaultErrorHandler;
}
private DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
KafkaTemplate<? extends Object, ? extends Object> kafkaTemplate) {
return new DeadLetterPublishingRecoverer(
kafkaTemplate,
(consumerRecord, e) -> {
log.error("Error while processing [{}]", consumerRecord, e);
final var topic = determineTopic(consumerRecord, e.getCause());
log.error("Moving [{}] to [{}]", consumerRecord, topic);
return new TopicPartition(topic, -1);
});
}
这次升级我们做错了什么?
我们尝试使用此设置创建一个 DefaultErrorHandler bean,但结果相同。
参见
FailedRecordTracker
。里面的逻辑是这样的:
long nextBackOff = failedRecord.getBackOffExecution().nextBackOff();
if (nextBackOff != BackOffExecution.STOP) {
this.backOffHandler.onNextBackOff(container, exception, nextBackOff);
return false;
}
else {
attemptRecovery(record, exception, topicPartition, consumer);
map.remove(topicPartition);
if (map.isEmpty()) {
this.failures.remove(currentThread);
}
return true;
}
确保您确实用尽了重试次数。
如果您能提供一些简单的项目来让我们在调试模式下重现和使用,那就太好了。