使用 reactor-netty http 客户端的顺序调用似乎释放了第一个响应的 ByteBuf

问题描述 投票:0回答:0

我需要按顺序进行两个 http 调用,一个依赖于另一个,在另一种情况下进行两个并行调用并组合它们的结果。

对于案例 1:相互依赖的顺序调用,我正在使用“zipWhen”将我的结果组合在一起

对于案例 2:异步并行调用,我使用“zip”订阅两个 http 请求的发布者并合并结果。

在这两种情况下,我发现包含第一个响应的 ByteBuf 在进行第二次调用时自动丢失引用。我怀疑这与 responseSingle 有关,但我不确定为什么会这样。

我在下面包含了 Case1(顺序)的代码示例:

package io.spring.workshop.reactornetty.http;

import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.server.HttpServer;

import java.nio.charset.StandardCharsets;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;


public class HttpCompressionTests {

    @Test
    public void httpCompressionTest() {
        DisposableServer server =
                HttpServer.create()   // Prepares a HTTP server for configuration.
                          .port(0)    // Configures the port number as zero, this will let the system pick up
                                      // an ephemeral port when binding the server.
                          .handle((req, res) -> res.sendString(Mono.just("compressed response")))
                          .compress(true) // Enables compression.
                          .wiretap(true)  // Applies a wire logger configuration.
                          .bindNow(); // Starts the server in a blocking fashion, and waits for it to finish initializing.

        assertNotNull(server);

        String response =
                HttpClient.create()            // Prepares a HTTP client for configuration.
                          .port(server.port()) // Obtains the server's port and provides it as a port to which this
                                               // client should connect.
                          .compress(true)          // Enables compression.
                          .wiretap(true)           // Applies a wire logger configuration.
                          .get()               // Specifies that GET method will be used.
                          .uri("/test")        // Specifies the path.
                          .responseSingle((res, body) -> Mono.zip(Mono.just(res), body.defaultIfEmpty(Unpooled.EMPTY_BUFFER)))   // Receives the response body.
//                          .aggregate()
//                          .asString()
                          .log("http-client")
                          .zipWhen(string1 ->
                              HttpClient.create()            // Prepares a HTTP client for configuration.
                                      .port(server.port()) // Obtains the server's port and provides it as a port to which this
                                      // client should connect.
                                      .compress(true)          // Enables compression.
                                      .wiretap(true)           // Applies a wire logger configuration.
                                      .get()               // Specifies that GET method will be used.
                                      .uri("/test")        // Specifies the path.
                                      .responseSingle((res, body) -> Mono.zip(Mono.just(res), body.defaultIfEmpty(Unpooled.EMPTY_BUFFER))),   // Receives the response body.
                                      (string1, string2) -> StandardCharsets.UTF_8.decode(string1.getT2().nioBuffer()).toString()
                          ).block();

        assertEquals("compressed response", response);

        server.disposeNow();          // Stops the server and releases the resources.
    }
}

任何帮助将不胜感激。谢谢!!

我尝试保留我的第一个响应 ByteBuf,这似乎可行,但我将不得不手动减少引用计数,我认为这可能会导致 Mem 泄漏。

java project-reactor reactor-netty
© www.soinside.com 2019 - 2024. All rights reserved.