在带有春季云流的kafka中自动提交

问题描述 投票:1回答:1

我有一个应用程序,我想在其中手动执行Kafka消息中的(n)ack。根据Spring Cloud文档,应使用autoCommitOffset spring cloud documentation

但是,在我的应用程序中,即使定义了这样的属性,标题KafkaHeaders.ACKNOWLEDGMENT仍然为空。

这是我的配置内容

spring.cloud.stream.kafka.binder.brokers=${KAFKA_BROKER_LIST}
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.bindings.mytopic.destination=MyInputTopic
spring.cloud.stream.bindings.mytopic.group=myConsumerGroup
spring.cloud.stream.kafka.bindings.mytopic.consumer.autoCommitOffset=false

和我的消费者:

@StreamListener("myTopic")
public void consume(@NotNull @Valid Message<MyTopic> message) {
    MyTopic payload = message.getPayload();
    Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); // always null
}

我正在将Java 13与Spring Boot 2.2.5.RELEASE和Spring Cloud Hoxton.SR1一起使用

感谢您的任何帮助。

java spring-boot apache-kafka spring-cloud spring-cloud-stream
1个回答
0
投票

我刚刚复制了您的属性,对我来说效果很好...

kafka_acknowledgment = ConsumerRecord的确认(主题= MyInputTopic,分区= 0,leaderEpoch = 0,偏移量= 0,CreateTime = 1589488691039,序列化密钥大小= -1,序列化值大小= 3,标题= RecordHeaders(headers = [],isReadOnly = false),key = null,value = [B @ 572887c3),contentType = application / json,kafka_groupId = myConsumerGroup}]
@SpringBootApplication @EnableBinding(Sink.class) public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @StreamListener(Sink.INPUT) public void listen(Message<String> in) { System.out.println(in); } @Bean public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) { return args -> { template.send("MyInputTopic", "foo".getBytes()); }; } }
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.bindings.input.destination=MyInputTopic
spring.cloud.stream.bindings.input.group=myConsumerGroup
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false
© www.soinside.com 2019 - 2024. All rights reserved.