Reactor Netty:在使用 WebClient 进行响应之前连接过早关闭

问题描述 投票:0回答:1

当尝试在随机时间点处理从 WebClient 检索的数据时,我收到“连接在响应之前提前关闭”错误。这些页面包含大量随后处理的数据(主要是数据库更新和插入)。经过一段随机时间后,我收到错误:

**Reactor Netty:在 WebClient 响应之前连接过早关闭。**

所讨论的方法如下:

  private Mono<String> migrateSomeData() {
    return getAllPages()
        .flatMapIterable(Page::getItems)
        .filter(this::isValidItem)
        .doOnNext(item -> doSomeLogging())
        .filter(item-> this.checkIfAlreadyProcessed(item))
        .flatMap(this::mapToDto)
        .flatMap(this::persist)
        .doOnNext(this::saveAsProcessed)
        .collectList()
        .map(this::getTotalAmountOfProcessed);
  }

代码中省略了某些用于酿造的部分。 getAllPages 利用 Flux .expand 方法递归调用下一页。这已经过验证并且工作正常。需要注意的更重要的一点是,所有数据库操作都是阻塞的,因为我们没有实现响应式 jdbc。

我尝试更新网络客户端配置:

@Bean
  public WebClient someClient(
      ReactiveOAuth2AuthorizedClientManager clientAuthorizedClientManager) {

    ServerOAuth2AuthorizedClientExchangeFilterFunction oauth =
        new ServerOAuth2AuthorizedClientExchangeFilterFunction(clientAuthorizedClientManager);
    oauth.setDefaultClientRegistrationId("clientId");

    final int size = 16 * 1024 * 1024;
    final ExchangeStrategies strategies = buildClientWithExtendedResponseSize(size);

    return WebClient.builder()
        .defaultHeader("Accept", MediaType.APPLICATION_JSON_VALUE)
        .defaultHeader("subscription-key", someKeyValue)
        .clientConnector(createWiretappedClientHttpConnector(this.getClass()))
        .filter(oauth)
        .exchangeStrategies(strategies)
        .build();
  }

  private static ExchangeStrategies buildClientWithExtendedResponseSize(int size) {
    return ExchangeStrategies.builder()
        .codecs(codecs -> codecs.defaultCodecs().maxInMemorySize(size))
        .build();
  }

private ClientHttpConnector createWiretappedClientHttpConnector(Class<?> invokedClass) {
    HttpClient httpClient =
        HttpClient.create()
            .option(ChannelOption.SO_KEEPALIVE, true)
            .responseTimeout(Duration.ofMinutes(5))
            .doOnConnected(
                conn ->
                    conn.addHandlerLast(new ReadTimeoutHandler(5 * 60))
                        .addHandlerLast(new WriteTimeoutHandler(5 * 60)))
            .compress(true)
            .wiretap(
                invokedClass.getCanonicalName(), LogLevel.TRACE, AdvancedByteBufFormat.TEXTUAL);
    return new ReactorClientHttpConnector(httpClient);
  }

我也尝试尽可能缩小此代码的范围,因此请原谅我的任何结构错误。

我已经针对wiremock中API的本地模拟实例进行了测试,问题仍然存在,因此应该排除服务器配置问题。

有没有可能是我的处理代码太慢,WebClients返回的响应没有及时消耗掉,导致webclient决定关闭连接?

java spring-boot spring-webflux project-reactor reactor-netty
1个回答
0
投票

抱歉回复晚了。 reactor 的 github 页面 上提出了一个关于它的问题。 这个问题似乎是“客户端发送[ACK]但没有[ACK,FIN]并保持连接打开。因此,客户端没有关闭连接,稍后又重新使用,导致此错误”。

解决方案应该是设置一个 max-idle-time (该连接在连接池中保持空闲的最长时间)。建议的值是将客户端服务器设置为

keepAliveTimeout
的值,但如果这令人困惑,请尝试一些小操作并进行操作。所以解决方案应该是这样的:

ConnectionProvider connectionProviderWithMaxIdleTime = ConnectionProvider.builder("withMaxIdleTime")
                .maxIdleTime(Duration.ofSeconds(120)) //if issue persists reduce this
                .build();
HttpClient httpClient = HttpClient.create(connectionProviderWithMaxIdleTime)
                .option(ChannelOption.SO_KEEPALIVE, true)
                {...the rest of your code...}

希望这有帮助!

© www.soinside.com 2019 - 2024. All rights reserved.