Spring Cloud Stream访问原始流

问题描述 投票:0回答:1

这是我的用例,用户使用websocket(带有订阅的GraphQl)订阅我的流,我需要通过用户ID返回org.reactivestreams.Publisher的实例(应该是我的kafka主题订阅)过滤消息。

为了说明,像这样:

/ **
  *  I don´t know how to get a instance of Publisher<Balance>
  *  It should be a consumer from a kafka topic
  */
fun balance(myStream: Publisher<Balance>, userId: String): Publisher<Balance> {
    return myStream.filter { it.userId == userId }
}
spring spring-cloud-stream spring-reactive
1个回答
0
投票
也许您需要编写一个Spring Cloud Stream使用者,然后以编程方式将其发布到WebSocket。类似于

public Consumer<Flux<Balance>> myStream() { //filter here and then publish to websocket. }

Here是一个WebSocket接收器实现的示例,您可以将其用作指导,但这不是被动的。 
© www.soinside.com 2019 - 2024. All rights reserved.