仅当HttpClient阻止时,Reactor Netty在HttpClient订阅时未获得HttpServer响应

问题描述 投票:0回答:1

使用github中的示例HttpClient / HttpServer示例,当HttpClient订阅而不是阻止时,我试图从示例HttpServer打印响应。这是使用的示例HttpServer:

public class Server {

    public static void main(String[] args) {

        DisposableServer server =
                HttpServer.create()
                        .host("10.0.0.19")
                        .port(61005)
                        .route(routes ->
                                routes.get("/hello",
                                        (request, response) -> response.sendString(Mono.just("Hello World!")))
                                    .post("/echo",
                                        (request, response) -> response.sendString(Mono.just("Hello World!"))))
                        .bindNow();

        server.onDispose()
                .block();
    }
}

这里是HttpClient和HttpServer使用的Maven依赖项:

    <dependencies>
        <dependency>
            <groupId>io.projectreactor.netty</groupId>
            <artifactId>reactor-netty</artifactId>
            <version>0.9.7.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.3.5.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.reactivestreams</groupId>
            <artifactId>reactive-streams</artifactId>
            <version>1.0.3</version>
        </dependency>
    </dependencies>

如果HttpClient阻止,则请求和响应可以像下面的阻止代码一样正常工作:

public class Client {

    private static final Logger log = Logger.getLogger(Client.class.getSimpleName());

    public static void main(String[] args) {

        String responseStr = HttpClient.create()
                .tcpConfiguration(tcpClient -> tcpClient.host("10.0.0.19"))
                .port(61005)
                .post()
                .uri("/echo")
                .send(ByteBufFlux.fromString(Mono.just("Hello")))
                .responseContent()
                .aggregate()
                .asString()
                .block();

        System.out.println(responseStr);
    }
}

但是HttpClient订阅了onSuccess,onError和onCompletion回调时,没有响应,也就是说,都不执行onSuccess,onError或onCompletion:

public class Client {

    private static final Logger log = Logger.getLogger(Client.class.getSimpleName());

    public static void main(String[] args) {

        Consumer<String> onSuccess = (String response) -> {
            log.info("response in onSuccess: "+response);

        };
        Consumer<Throwable> onError = (Throwable ex) -> {
            ex.getMessage();
        };

        Runnable onCompletion = () -> {
            System.out.println("Message Completed");

        };

            HttpClient.create()
                .tcpConfiguration(tcpClient -> tcpClient.host("10.0.0.19"))
                .port(61005)
                .post()
                .uri("/echo")
                .send(ByteBufFlux.fromString(Mono.just("Hello")))
                .responseContent()
                .aggregate()
                .asString()
                .subscribe(onSuccess, onError, onCompletion);
    }
}

我无法找到使用subscribe()的示例。在上面的使用subscribe()的HttpClient的示例中,HttpServer似乎没有返回响应。任何煽动这种现象似乎会发生的原因都会有所帮助。

java netty reactive-programming reactor-netty
1个回答
0
投票

在Reactor Netty示例中,使用.block是为了使main线程保持活动状态,因为我们不希望该线程退出。使用.subscribe时,您需要另一种机制来保持main线程处于活动状态并防止其退出。在您的示例中发生的是main线程退出并且您看不到结果。

例如,您可以使用CountDownLatch

public class Client {

    private static final Logger log = Logger.getLogger(Client.class.getSimpleName());

    public static void main(String[] args) throws Exception {

        CountDownLatch latch = new CountDownLatch(1);

        Consumer<String> onSuccess = (String response) -> {
            log.info("response in onSuccess: "+response);

        };
        Consumer<Throwable> onError = (Throwable ex) -> {
            ex.getMessage();
            latch.countDown();
        };

        Runnable onCompletion = () -> {
            System.out.println("Message Completed");
            latch.countDown();
        };

            HttpClient.create()
                .tcpConfiguration(tcpClient -> tcpClient.host("10.0.0.19"))
                .port(61005)
                .post()
                .uri("/echo")
                .send(ByteBufFlux.fromString(Mono.just("Hello")))
                .responseContent()
                .aggregate()
                .asString()
                .subscribe(onSuccess, onError, onCompletion);

            latch.await();
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.