配置默认错误处理程序以在恢复后提交消息

问题描述 投票:0回答:1

我想在处理/反序列化失败时重试消费消息有限次。重试结束后,我想记录一条消息,提交偏移量并继续使用更多消息。为此,我配置了错误处理程序 bean,如下所示。

我使用 ack 模式作为 MANUAL_IMMEDIATE,自动提交被

enable.auto.commit: false 
禁用 偏移量是通过引用确认对象 (ack.acknowledge()) 来手动/编程提交的

编辑:根据加里的评论,我编辑了错误处理程序的配置

@Bean
public DefaultErrorHandler errorHandler(){

   ConsumerRecordRecoverer recovery = (record, ex) ->{
   
      log.error("Retries have been exhausted. Commiting offset "+record.offset())
    }
  
   // Default backoff
   BackOff backoff = new FixedBackOff(3000, 5);
   DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(recovery, backoff);
   defaultErrorHandler.setCommitRecovered(true);
   defaultErrorHandler.setAckAfterHandle(true);

  // BackOff to use when HttpServerException occurs
     BackOff backOff = new FixedBackOff(FixedBackOff.DEFAULT_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS);

  BiFunction<ConsumerRecord<?,?>,Exception, BackOff> backOffFunction = (record, ex) -> {
  
  if(ex instanceOf HttpServerException){
      return backOff;
     }
    return null;
  }
   defaultErrorHandler.removeClassification(DeserializationException.class);
   defaultErrorHandler.setBackOffFunction(backOffFunction)
   return defaultErrorHandler;
}  

问题:

  1. 在重试(5)已用尽的情况下,想法是记录一条消息并提交偏移量,然后继续使用其他消息。为此,我在错误处理程序上配置了以下内容。请确认这是否足够?
defaultErrorHandler.setCommitRecovered(true);
defaultErrorHandler.setAckAfterHandle(true);
  1. 由于 DesrializationExeption 是致命列表的一部分,因此不会重试。因此,我将错误处理程序配置为以这种方式删除它,以便在发生反序列化错误时按照配置进行重试。请确认是否正确?

defaultErrorHandler.removeClassification(DeserializationException.class);

  1. 有没有办法为给定的异常配置重试策略?例如,在数据库不可用错误时无限重试消费消息。这在 Springboot/Spring-kafka 的早期版本中是可能的,因为可以在监听器容器上设置重试策略。但是,我在最新版本中无法弄清楚这一点。如果可能的话请帮我提供任何样品。

注意:我已经尝试了上面的配置,它似乎有效。我想向专家核实一下,这个配置是否达到了预期的效果,并且不会造成任何其他影响。

java spring-boot spring-kafka spring-retry
1个回答
1
投票

是的,一切都配置正确。

有没有办法为给定的异常配置重试策略?例如,在数据库不可用错误时无限重试消费消息。

/**
 * Set a function to dynamically determine the {@link BackOff} to use, based on the
 * consumer record and/or exception. If null is returned, the default BackOff will be
 * used.
 * @param backOffFunction the function.
 * @since 2.6
 */
public void setBackOffFunction(BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction) {

还有

/**
 * Set to true to reset the retry {@link BackOff} if the exception is a different type
 * to the previous failure for the same record. The
 * {@link #setBackOffFunction(BiFunction) backOffFunction}, if provided, will be
 * called to get the {@link BackOff} to use for the new exception; otherwise, the
 * configured {@link BackOff} will be used. Default true since 2.9; set to false
 * to use the existing retry state, even when exceptions change.
 * @param resetStateOnExceptionChange true to reset.
 * @since 2.6.3
 */
public void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange) {

(现在默认为真)。

© www.soinside.com 2019 - 2024. All rights reserved.