我有一个应用程序,我想在其中手动执行Kafka消息中的(n)ack。根据Spring Cloud文档,应使用autoCommitOffset
spring cloud documentation
但是,在我的应用程序中,即使定义了这样的属性,标题KafkaHeaders.ACKNOWLEDGMENT
仍然为空。
这是我的配置内容
spring.cloud.stream.kafka.binder.brokers=${KAFKA_BROKER_LIST}
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.bindings.mytopic.destination=MyInputTopic
spring.cloud.stream.bindings.mytopic.group=myConsumerGroup
spring.cloud.stream.kafka.bindings.mytopic.consumer.autoCommitOffset=false
和我的消费者:
@StreamListener("myTopic")
public void consume(@NotNull @Valid Message<MyTopic> message) {
MyTopic payload = message.getPayload();
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); // always null
}
我正在将Java 13与Spring Boot 2.2.5.RELEASE和Spring Cloud Hoxton.SR1一起使用
感谢您的任何帮助。
我刚刚复制了您的属性,对我来说效果很好...
kafka_acknowledgment = ConsumerRecord的确认(主题= MyInputTopic,分区= 0,leaderEpoch = 0,偏移量= 0,CreateTime = 1589488691039,序列化密钥大小= -1,序列化值大小= 3,标题= RecordHeaders(headers = [],isReadOnly = false),key = null,value = [B @ 572887c3),contentType = application / json,kafka_groupId = myConsumerGroup}]
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(Message<String> in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("MyInputTopic", "foo".getBytes());
};
}
}
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.bindings.input.destination=MyInputTopic
spring.cloud.stream.bindings.input.group=myConsumerGroup
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false