我想在处理/反序列化失败时重试消费消息有限次。重试结束后,我想记录一条消息,提交偏移量并继续使用更多消息。为此,我配置了错误处理程序 bean,如下所示。
我使用 ack 模式作为 MANUAL_IMMEDIATE,自动提交被
enable.auto.commit: false
禁用
偏移量是通过引用确认对象 (ack.acknowledge()) 来手动/编程提交的
编辑:根据加里的评论,我编辑了错误处理程序的配置
@Bean
public DefaultErrorHandler errorHandler(){
ConsumerRecordRecoverer recovery = (record, ex) ->{
log.error("Retries have been exhausted. Commiting offset "+record.offset())
}
// Default backoff
BackOff backoff = new FixedBackOff(3000, 5);
DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(recovery, backoff);
defaultErrorHandler.setCommitRecovered(true);
defaultErrorHandler.setAckAfterHandle(true);
// BackOff to use when HttpServerException occurs
BackOff backOff = new FixedBackOff(FixedBackOff.DEFAULT_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS);
BiFunction<ConsumerRecord<?,?>,Exception, BackOff> backOffFunction = (record, ex) -> {
if(ex instanceOf HttpServerException){
return backOff;
}
return null;
}
defaultErrorHandler.removeClassification(DeserializationException.class);
defaultErrorHandler.setBackOffFunction(backOffFunction)
return defaultErrorHandler;
}
问题:
defaultErrorHandler.setCommitRecovered(true);
defaultErrorHandler.setAckAfterHandle(true);
defaultErrorHandler.removeClassification(DeserializationException.class);
注意:我已经尝试了上面的配置,它似乎有效。我想向专家核实一下,这个配置是否达到了预期的效果,并且不会造成任何其他影响。
是的,一切都配置正确。
有没有办法为给定的异常配置重试策略?例如,在数据库不可用错误时无限重试消费消息。
看
/**
* Set a function to dynamically determine the {@link BackOff} to use, based on the
* consumer record and/or exception. If null is returned, the default BackOff will be
* used.
* @param backOffFunction the function.
* @since 2.6
*/
public void setBackOffFunction(BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction) {
还有
/**
* Set to true to reset the retry {@link BackOff} if the exception is a different type
* to the previous failure for the same record. The
* {@link #setBackOffFunction(BiFunction) backOffFunction}, if provided, will be
* called to get the {@link BackOff} to use for the new exception; otherwise, the
* configured {@link BackOff} will be used. Default true since 2.9; set to false
* to use the existing retry state, even when exceptions change.
* @param resetStateOnExceptionChange true to reset.
* @since 2.6.3
*/
public void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange) {
(现在默认为真)。