我的Kafka Consumer重试10次异常,通过简单添加MDC日志来停止重试?

问题描述 投票:0回答:1

我有一个 Kafka 监听器,我向该主题发送一条 Kafka 消息,我的监听器接收了 10 次(第二次是在第一个消息完成其处理功能之后,第三次是在第二个消息完成之后......等等)开。)

我的消费函数是这样的:

@KafkaListener(topics = "xxx", groupId = "xxx")
public void onMessageUser(ConsumerRecord<?, ?> record, Acknowledgment ack) {
    //some code
    //xxxx
    ack.acknowledge();
}

我的卡夫卡配置:

spring.kafka.producer.retries=5
spring.kafka.producer.acks=1
spring.kafka.consumer.max-poll-records=50
spring.kafka.consumer.poll-timeout=5000
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=MANUAL

我添加一个MDC(映射诊断上下文)日志,然后消费者就变对了!它停止重试 10 次。只需接收一次,处理一次。

org.slf4j.MDC.put("requestId", xxx);

魔力在哪里????


更新: 非常感谢@Artem Bilan 和@Gary Russell 的有用建议。 我已经找到原因了。这是“一些代码”片段中的错误代码:

Map<String,Object> prop = new ConcurrentHashMap<>();   
prop.put(CommonConstants.Request.REQUEST_ID,MDC.get(CommonConstants.Request.REQUEST_ID));

“MDC.get(CommonConstants.Request.REQUEST_ID)”为空值,因此会导致空指针异常:

java.lang.NullPointerException: null    
at java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011)     
at java.base/java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006)

而且由于我没有带有问题代码的 try catch 块,因此该异常不会打印在我的错误日志中,因此我很长一段时间都找不到该错误。 (我的项目中有一个通用的Spring Web异常处理程序,但是Kafka消息不经过我的Spring Web异常处理程序)

该异常导致Kafka默认重试策略为额外重试9次。

exception apache-kafka kafka-consumer-api spring-kafka mdc
1个回答
1
投票

最有可能的是,您的

some code
抛出异常,因为 MDC
requestId
不存在。

这将导致默认行为 9 次重试,每次重试之间的延迟为零。

您没有显示日志配置,但我认为该模式需要 MDC 存在。

© www.soinside.com 2019 - 2024. All rights reserved.