我试图使用ReactorNettyWebSocketClient
从websocket读取数据,但我只是无法连接到它的api。问题是我收到的所有数据都可以在lambda样式的websockethandler(#1)的内部部分获得,但我想在client.execute(..)(#2)上的.subscribe之后将它们提供给订阅者。
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(
URI.create(URL),
session -> session.send(
Mono.just(session.textMessage(pairRqStr)))
.thenMany(session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(this::toResp)
.onErrorContinue((throwable, o) -> throwable.getMessage())
)
.log() // #1
.then()
)
.log()
.subscribe(System.out::println); // #2
对此有点失落和新意,所以,请指导我。
要发送或接收消息,您必须先连接到该频道。这就是为什么client.execute
返回Mono<Void>
,这意味着它不返回数据,它只是表示handskahe的完成或失败。如果它会返回例如。 Flux<WebSocketMessage>
,你怎么知道handskake是否成功完成了?
如果要访问lambda外部的通道,则无法通过实现处理程序方法来执行此操作:
Consumer<WebSocketMessage> printingConsumer = webSocketMessage -> System.out.println(webSocketMessage.getPayloadAsText());
client.execute(URI.create(URL), session -> handle(session, printingConsumer));
}
public Mono<Void> handle(WebSocketSession session, Consumer<WebSocketMessage> consumer) {
return session.receive()
.doOnNext(consumer::accept)
.then();
}
这是我最后做的,但仍然不喜欢它:
private Flux<WebSocketMessage> requestData(String req) {
WebSocketClient client = new ReactorNettyWebSocketClient();
return ConnectableFlux.create(sub -> {
client.execute(
URI.create(URL),
session -> session.send(
Mono.just(session.textMessage(req)))
.thenMany(session.receive().doOnNext(sub::next))
.then()
)
.log()
.subscribe();
});
}