发送另一个消息后如何接收单声道消息

问题描述 投票:0回答:3

如何发送buf然后收到一个消息

方法

Mono<ByteBuf> send(ByteBuf buf){
    // how to send the buf then receive a msg
}

我正在尝试通过从连接出站发送一个msg并从入站接收一个消息然后返回消息Mono来实现此方法。但我只能在当时(Publisher)方法中接收消息。它似乎无法返回到Mono数据

我试过这个。

// the connecttion has been initialized before entering this method.

        Mono.just(buf)
                .doOnNext(data -> connection.outbound().sendObject(data).then().subscribe())
                .then(connection
                        .inbound()
                        .receiveObject()
                        .single()
                        .map(RpcDataPackage.class::cast)
                        .map(RpcDataPackage::getData)
                        .map(data -> {
                            try {
                                return resCodec.decode(data);
                            } catch (IOException e) {
                                throw new RpcRequestException(e);
                            }
                        })
                );

但它会阻止,直到连接超时

我尝试了另一个代码。我添加了一个handle方法并将响应放到地图上。然后我可以在Mono.fromSupply()上获得map.get(key) != null和while循环休息。

它会阻止线程。

                .handle(((nettyInbound, nettyOutbound) -> nettyInbound
                        .receiveObject()
                        .map(RpcDataPackage.class::cast)
                        .doOnNext(pkg -> {
                            String responseKey = "a key"

                            responseMap.put(responseKey, pkg);
                        })
                        .then()))
java project-reactor reactor-netty
3个回答
0
投票

您没有指定您的期望。请参阅下面的示例,它会发送一些数据,然后接收服务器返回的内容。

    @Test
    public void test() {
        Connection connection =
                TcpClient.create()
                         .wiretap(true)
                         .host("example.com")
                         .port(80)
                         .connect()
                         .block();

        assertNotNull(connection);

        connection.outbound()
                  .sendObject(Unpooled.wrappedBuffer("test".getBytes()))
                  .then(connection.inbound()
                                  .receiveObject()
                                  .last()
                                  .doOnNext(System.out::println)
                                  .then())
                  .then()
                  .block();
    }

0
投票

我读了Mono javadoc并找到了MonoSink。

Mono.create(monoSink -> {
  // some call
})

当入站接收对象响应时,只需执行sink.success()


0
投票

您应该使用NettyOutbound :: then的组合来监听写入完成,然后使用Mono ::然后在写入后读取NettyInboud。

  Mono<String> resposeMono = TcpClient.create()
            .connect()
            .flatMap(connection -> connection.outbound().sendString(Mono.just("Hello!"))
                    .then()
                    .then(connection.inbound().receive().aggregate().asString())
                    .doOnTerminate(connection::dispose));

这将写成“你好!”到输出,从输入读取所有字节作为字符串然后处置连接。

© www.soinside.com 2019 - 2024. All rights reserved.