我正在尝试测试 Kafka 的 @RetryableTopic 功能,在重试消息 3 次后,我想将其推送到 Kafka SQS。我从重试线程调用了 4 次,并调用了 DltHandler 2 次。据我所知,在 Spring Retry 中,@DltHandler 注释用于处理在所有重试尝试都用尽后失败并已移至死信队列(DLQ),所以我期望只调用一次 processMessage() 。我正在使用线程名称跟踪对 Consumer() 方法的调用,以识别哪个线程正在调用此方法。不确定我错过了哪一部分?
@Bean(ProductServiceConstants.PRODUCT_KAFKA_DLT_PRODUCER_FACTORY)
public KafkaTemplate<String, String> kafkaTemplateForDlt() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public RetryTopicConfiguration myRetryTopic(@Qualifier(ProductServiceConstants.PRODUCT_KAFKA_DLT_PRODUCER_FACTORY)KafkaTemplate<String, String> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
}
@Slf4j
@Component
public class ProductEventConsumer {
@Autowired
private ProductServiceImpl productServiceImpl;
@Autowired ObjectMapper objectMapper;
@Value("${aws.sqsDLQ}")
private String productdlq;
@Autowired
private SqsTemplate sqsTemplate;
@RetryableTopic(
backoff = @Backoff(delayExpression = "10000", multiplierExpression = "0"),
attempts = "3",
kafkaTemplate = ProductServiceConstants.PRODUCT_KAFKA_DLT_PRODUCER_FACTORY,
include = {SocketTimeoutException.class,ArithmeticException.class})
@KafkaListener(id=ProductServiceConstants.PRODUCT_KAFKA_CONSUMER_ID, idIsGroup=false,
topics="#{'${spring.kafka.product-topic}'}",containerFactory=ProductServiceConstants.PRODUCT_KAFKA_CONSUMER_FACTORY)
public void consumer(ConsumerRecord<String,String> consumerRecord, Acknowledgment ack) {
try{
log.info("START:Received request via kafka:{} thread:{}",consumerRecord.value()
,Thread.currentThread().getName());
int result = 10 / 0;
ack.acknowledge();
}catch( JsonProcessingException e) {
log.error("END:Exception occured while saving item:{}",e.getMessage());
}
}
@DltHandler
public void processMessage(ConsumerRecord<String,String> consumerRecord, Acknowledgment ack) {
try{
log.error("START:Pushing message to SQS DLQ:{}",consumerRecord.key());
sqsTemplate.send(sqsSendOptions -> sqsSendOptions.queue(productdlq).payload(consumerRecord.value()));
}catch(Exception e) {
log.error("END:Failure while pushing msg to sqs dlq:{} key:{}",e.getMessage(),consumerRecord.key());
}
finally {
ack.acknowledge();
}
}
}
请看一下这个例子。我相信分支
so-78201070
可以满足您的需求:
return RetryTopicConfigurationBuilder.newInstance()
.retryTopicSuffix("-" + appName + "-retry")
.maxAttempts(3) // original processing + 2x retry
.fixedBackOff(1000L) // 1000ms delay between retries
.suffixTopicsWithIndexValues()
.dltSuffix("-" + appName + ".dlt")
.includeTopic(topicToInclude)
.dltHandlerMethod("eventListener", "processDltEvent")
.create(template);
测试
retryAndDlt
发送一个重试两次的事件,然后发送到DLQ/DLT。
希望有帮助。