如何使用Spring Reactive WebSocket并将其转换为Flux流?

问题描述 投票:0回答:1

Spring 文档

上有一些 
WebSocketClient 示例:

WebSocketClient client = new ReactorNettyWebSocketClient(); client.execute("ws://localhost:8080/echo"), session -> {...}).blockMillis(5000);

我不确定如何处理传入数据流? 在那个街区内

{...}

我的意思是:如何过滤传入的数据并将其转换为 Flux?

这就是我想要得到的。

@GetMapping("/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux<MyRecourse> getStreaming() { // get some data from WebSocket (CoinCap service). // Transform that data into MyRecourse object // Return stream to a client }
    
spring-websocket spring-webflux
1个回答
3
投票
只需查看

WebSocketSession

 lambda 的 
WebSocketHandler.handle()
 参数即可:

/** * Get the flux of incoming messages. */ Flux<WebSocketMessage> receive();
请参阅

Spring WebFlux Workshop了解更多信息。

更新

我们来试试这个吧!

Mono<Void> sessionMono = client.execute(new URI("ws://localhost:8080/echo"), session -> Mono.empty() .subscriberContext(Context.of(WebSocketSession.class, session)) .then()); return sessionMono .thenMany( Mono.subscriberContext() .flatMapMany(c -> c .get(WebSocketSession.class) .receive())) .map(WebSocketMessage::getPayloadAsText);

更新2

或另一种选择,但订阅被阻止:

EmitterProcessor<String> output = EmitterProcessor.create(); client.execute(new URI("ws://localhost:8080/echo"), session -> session.receive() .map(WebSocketMessage::getPayloadAsText) .subscribeWith(output) .then()) .block(Duration.ofMillis(5000)); return output;

更新3

关于此事的工作 Spring Boot 应用程序:

https://github.com/artembilan/sandbox/tree/master/websocket-over-webflux

主要代码如下:

EmitterProcessor<String> output = EmitterProcessor.create(); Mono<Void> sessionMono = client.execute(new URI("ws://localhost:8080/echo"), session -> session.receive() .map(WebSocketMessage::getPayloadAsText) .subscribeWith(output) .then()); return output.doOnSubscribe(s -> sessionMono.subscribe());
    
© www.soinside.com 2019 - 2024. All rights reserved.