我有一个 Kafka 监听器,我向该主题发送一条 Kafka 消息,我的监听器接收了 10 次(第二次是在第一个消息完成其处理功能之后,第三次是在第二个消息完成之后......等等)开。)
我的消费函数是这样的:
@KafkaListener(topics = "xxx", groupId = "xxx")
public void onMessageUser(ConsumerRecord<?, ?> record, Acknowledgment ack) {
//some code
//xxxx
ack.acknowledge();
}
我的卡夫卡配置:
spring.kafka.producer.retries=5
spring.kafka.producer.acks=1
spring.kafka.consumer.max-poll-records=50
spring.kafka.consumer.poll-timeout=5000
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=MANUAL
我添加一个MDC(映射诊断上下文)日志,然后消费者就变对了!它停止重试 10 次。只需接收一次,处理一次。
org.slf4j.MDC.put("requestId", xxx);
魔力在哪里????
更新: 非常感谢@Artem Bilan 和@Gary Russell 的有用建议。 我已经找到原因了。这是“一些代码”片段中的错误代码:
Map<String,Object> prop = new ConcurrentHashMap<>();
prop.put(CommonConstants.Request.REQUEST_ID,MDC.get(CommonConstants.Request.REQUEST_ID));
“MDC.get(CommonConstants.Request.REQUEST_ID)”为空值,因此会导致空指针异常:
java.lang.NullPointerException: null
at java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011)
at java.base/java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006)
而且由于我没有带有问题代码的 try catch 块,因此该异常不会打印在我的错误日志中,因此我很长一段时间都找不到该错误。 (我的项目中有一个通用的Spring Web异常处理程序,但是Kafka消息不经过我的Spring Web异常处理程序)
该异常导致Kafka默认重试策略为额外重试9次。
最有可能的是,您的
some code
抛出异常,因为 MDC requestId
不存在。
这将导致默认行为 9 次重试,每次重试之间的延迟为零。
您没有显示日志配置,但我认为该模式需要 MDC 存在。