即使在确认消息后,重复消息也会多次发送到@Dlthandler

问题描述 投票:0回答:1

我正在尝试测试 Kafka 的 @RetryableTopic 功能,在重试消息 3 次后,我想将其推送到 Kafka SQS。我从重试线程调用了 4 次,并调用了 DltHandler 2 次。据我所知,在 Spring Retry 中,@DltHandler 注释用于处理在所有重试尝试都用尽后失败并已移至死信队列(DLQ),所以我期望只调用一次 processMessage() 。我正在使用线程名称跟踪对 Consumer() 方法的调用,以识别哪个线程正在调用此方法。不确定我错过了哪一部分?

@Bean(ProductServiceConstants.PRODUCT_KAFKA_DLT_PRODUCER_FACTORY)
public KafkaTemplate<String, String> kafkaTemplateForDlt() {
    return new KafkaTemplate<>(producerFactory());
}

@Bean
public RetryTopicConfiguration myRetryTopic(@Qualifier(ProductServiceConstants.PRODUCT_KAFKA_DLT_PRODUCER_FACTORY)KafkaTemplate<String, String> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .create(template);
}
}

      
@Slf4j
@Component
public class ProductEventConsumer {
@Autowired
private ProductServiceImpl productServiceImpl;

@Autowired ObjectMapper objectMapper;

@Value("${aws.sqsDLQ}")
private String productdlq;

@Autowired
private SqsTemplate sqsTemplate;

@RetryableTopic(
          backoff = @Backoff(delayExpression = "10000", multiplierExpression = "0"), 
          attempts = "3", 
          kafkaTemplate = ProductServiceConstants.PRODUCT_KAFKA_DLT_PRODUCER_FACTORY,
          include = {SocketTimeoutException.class,ArithmeticException.class})
@KafkaListener(id=ProductServiceConstants.PRODUCT_KAFKA_CONSUMER_ID, idIsGroup=false,
        topics="#{'${spring.kafka.product-topic}'}",containerFactory=ProductServiceConstants.PRODUCT_KAFKA_CONSUMER_FACTORY)
public void consumer(ConsumerRecord<String,String> consumerRecord, Acknowledgment ack) {
    try{
        log.info("START:Received request via kafka:{} thread:{}",consumerRecord.value()
                ,Thread.currentThread().getName());
        int result = 10 / 0;
        ack.acknowledge();
    }catch( JsonProcessingException e) {
        log.error("END:Exception occured while saving item:{}",e.getMessage());
    }
}

@DltHandler
public void processMessage(ConsumerRecord<String,String> consumerRecord, Acknowledgment ack) {
    try{
        log.error("START:Pushing message to SQS DLQ:{}",consumerRecord.key());
        sqsTemplate.send(sqsSendOptions -> sqsSendOptions.queue(productdlq).payload(consumerRecord.value()));
    }catch(Exception e) {
        log.error("END:Failure while pushing msg to sqs dlq:{} key:{}",e.getMessage(),consumerRecord.key());
    }
    finally {
        ack.acknowledge();
    }
}    

}
spring-kafka spring-retry
1个回答
0
投票

请看一下这个例子。我相信分支

so-78201070
可以满足您的需求:

return RetryTopicConfigurationBuilder.newInstance()
        .retryTopicSuffix("-" + appName + "-retry")
        .maxAttempts(3) // original processing + 2x retry
        .fixedBackOff(1000L) // 1000ms delay between retries
        .suffixTopicsWithIndexValues()
        .dltSuffix("-" + appName + ".dlt")
        .includeTopic(topicToInclude)
        .dltHandlerMethod("eventListener", "processDltEvent")
        .create(template);

测试

retryAndDlt
发送一个重试两次的事件,然后发送到DLQ/DLT。

希望有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.