如何在 Spring Cloud Stream 中获取关联 ID

问题描述 投票:0回答:1

春季队,

下面的生产者能够成功地将值发送到kafka主题。

@Bean
Supplier<Flux<Integer>> someProducer(){
    return () -> Flux.range(1, 10);
}

但是..当我们使用ReactiveKafkaSender时,我们如何获得消息的correlation id?由于flux是Spring内部订阅的,请问有什么办法获取吗?

spring-cloud-stream
1个回答
1
投票

活页夹目前不支持获取完整的

SenderResult
,只支持发送成功的
RecordMetadata

Please open a bug on GitHub (

spring-cloud-stream
) and reference this question.

要获得

RecordMetadata
,您可以使用
Supplier<Flux<Message<Integer>>>
并将消息中的
senderResult
标题设置为
AtomicInteger<Mono<RecordMetadata>>
;它将填充一个你可以订阅的单声道。

这里有一个测试:https://github.com/spring-cloud/spring-cloud-stream/blob/29c3cd7cddf9b853c57fca2b2118f1b64e5dde30/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test /java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderTests.java#L315-L323

但是,我可以看到,如果没有关联元数据,这并没有多大用处。

© www.soinside.com 2019 - 2024. All rights reserved.