如何发送buf然后收到一个消息
方法
Mono<ByteBuf> send(ByteBuf buf){
// how to send the buf then receive a msg
}
我正在尝试通过从连接出站发送一个msg并从入站接收一个消息然后返回消息Mono来实现此方法。但我只能在当时(Publisher)方法中接收消息。它似乎无法返回到Mono数据
我试过这个。
// the connecttion has been initialized before entering this method.
Mono.just(buf)
.doOnNext(data -> connection.outbound().sendObject(data).then().subscribe())
.then(connection
.inbound()
.receiveObject()
.single()
.map(RpcDataPackage.class::cast)
.map(RpcDataPackage::getData)
.map(data -> {
try {
return resCodec.decode(data);
} catch (IOException e) {
throw new RpcRequestException(e);
}
})
);
但它会阻止,直到连接超时
我尝试了另一个代码。我添加了一个handle
方法并将响应放到地图上。然后我可以在Mono.fromSupply()
上获得map.get(key) != null
和while循环休息。
它会阻止线程。
.handle(((nettyInbound, nettyOutbound) -> nettyInbound
.receiveObject()
.map(RpcDataPackage.class::cast)
.doOnNext(pkg -> {
String responseKey = "a key"
responseMap.put(responseKey, pkg);
})
.then()))
您没有指定您的期望。请参阅下面的示例,它会发送一些数据,然后接收服务器返回的内容。
@Test
public void test() {
Connection connection =
TcpClient.create()
.wiretap(true)
.host("example.com")
.port(80)
.connect()
.block();
assertNotNull(connection);
connection.outbound()
.sendObject(Unpooled.wrappedBuffer("test".getBytes()))
.then(connection.inbound()
.receiveObject()
.last()
.doOnNext(System.out::println)
.then())
.then()
.block();
}
我读了Mono javadoc并找到了MonoSink。
Mono.create(monoSink -> {
// some call
})
当入站接收对象响应时,只需执行sink.success()
您应该使用NettyOutbound :: then的组合来监听写入完成,然后使用Mono ::然后在写入后读取NettyInboud。
Mono<String> resposeMono = TcpClient.create()
.connect()
.flatMap(connection -> connection.outbound().sendString(Mono.just("Hello!"))
.then()
.then(connection.inbound().receive().aggregate().asString())
.doOnTerminate(connection::dispose));
这将写成“你好!”到输出,从输入读取所有字节作为字符串然后处置连接。