春季队,
下面的生产者能够成功地将值发送到kafka主题。
@Bean
Supplier<Flux<Integer>> someProducer(){
return () -> Flux.range(1, 10);
}
但是..当我们使用ReactiveKafkaSender时,我们如何获得消息的correlation id?由于flux是Spring内部订阅的,请问有什么办法获取吗?
活页夹目前不支持获取完整的
SenderResult
,只支持发送成功的RecordMetadata
。
Please open a bug on GitHub (
spring-cloud-stream
) and reference this question.
要获得
RecordMetadata
,您可以使用Supplier<Flux<Message<Integer>>>
并将消息中的senderResult
标题设置为AtomicInteger<Mono<RecordMetadata>>
;它将填充一个你可以订阅的单声道。
但是,我可以看到,如果没有关联元数据,这并没有多大用处。