Acknowledgement.acknowledge() 在 spring-kafka @KafkaListener 中抛出异常

问题描述 投票:0回答:2

当我将enable.auto.commit 设置为 false 并尝试使用基于注释的 spring-kafka @KafkaListener 手动提交偏移量时,我收到 org.springframework.kafka.listener.ListenerExecutionFailedException: Listener 方法无法使用传入消息调用

我有一个非常简单的代码如下:

@KafkaListener(id = "someid", topics = "${demo.topic}", containerFactory = "someContainerFactory")
public void listenFooGroup(String message, Acknowledgement ack) {
    System.out.println("Received Messasge in group 'foo': " + message);

    // TODO: Do something with the message
}

当我从生产者发送消息时,出现以下异常:

org.springframework.kafka.listener.ListenerExecutionFailedException:无法使用传入消息调用侦听器方法。

端点处理程序详细信息:

方法 [public void com.****.*****.********.KafkaMessageListener.listenFooGroup(java.lang.String,org.springframework.kafka.support.Acknowledgment)]

Bean [com.****.*****.********.KafkaMessageListener@5856dbe4];嵌套异常是 org.springframework.messaging.converter.MessageConversionException:无法处理消息;嵌套异常是 org.springframework.messaging.converter.MessageConversionException:无法从 [java.lang.String] 转换为 [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=test, headers={kafka_offset=57, kafka_receivedMessageKey =null, kafka_receivedPartitionId=0, kafka_receivedTopic=demotopic}], failedMessage=GenericMessage [有效负载=测试,标头={kafka_offset=57,kafka_receivedMessageKey=null,kafka_receivedPartitionId=0,kafka_receivedTopic=demotopic}]

请帮忙。短暂性脑缺血发作。

spring-boot apache-kafka spring-kafka
2个回答
28
投票

您必须将容器工厂的

containerProperties
ackMode 设置为
MANUAL
MANUAL_IMMEDIATE
才能获取
Acknowledgment
对象。

对于其他 ack 模式,容器负责提交偏移量。

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE)

或者如果使用 Spring Boot 则设置

....ackMode
属性


0
投票

通过将以下内容应用于 application.properties 在 Spring Boot 2.4.3 中解决了此问题

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual

根据您的用例将确认模式设置为手动或手动_立即。

© www.soinside.com 2019 - 2024. All rights reserved.