当尝试在随机时间点处理从 WebClient 检索的数据时,我收到“连接在响应之前提前关闭”错误。这些页面包含大量随后处理的数据(主要是数据库更新和插入)。经过一段随机时间后,我收到错误:
**Reactor Netty:在 WebClient 响应之前连接过早关闭。**
所讨论的方法如下:
private Mono<String> migrateSomeData() {
return getAllPages()
.flatMapIterable(Page::getItems)
.filter(this::isValidItem)
.doOnNext(item -> doSomeLogging())
.filter(item-> this.checkIfAlreadyProcessed(item))
.flatMap(this::mapToDto)
.flatMap(this::persist)
.doOnNext(this::saveAsProcessed)
.collectList()
.map(this::getTotalAmountOfProcessed);
}
代码中省略了某些用于酿造的部分。 getAllPages 利用 Flux .expand 方法递归调用下一页。这已经过验证并且工作正常。需要注意的更重要的一点是,所有数据库操作都是阻塞的,因为我们没有实现响应式 jdbc。
我尝试更新网络客户端配置:
@Bean
public WebClient someClient(
ReactiveOAuth2AuthorizedClientManager clientAuthorizedClientManager) {
ServerOAuth2AuthorizedClientExchangeFilterFunction oauth =
new ServerOAuth2AuthorizedClientExchangeFilterFunction(clientAuthorizedClientManager);
oauth.setDefaultClientRegistrationId("clientId");
final int size = 16 * 1024 * 1024;
final ExchangeStrategies strategies = buildClientWithExtendedResponseSize(size);
return WebClient.builder()
.defaultHeader("Accept", MediaType.APPLICATION_JSON_VALUE)
.defaultHeader("subscription-key", someKeyValue)
.clientConnector(createWiretappedClientHttpConnector(this.getClass()))
.filter(oauth)
.exchangeStrategies(strategies)
.build();
}
private static ExchangeStrategies buildClientWithExtendedResponseSize(int size) {
return ExchangeStrategies.builder()
.codecs(codecs -> codecs.defaultCodecs().maxInMemorySize(size))
.build();
}
private ClientHttpConnector createWiretappedClientHttpConnector(Class<?> invokedClass) {
HttpClient httpClient =
HttpClient.create()
.option(ChannelOption.SO_KEEPALIVE, true)
.responseTimeout(Duration.ofMinutes(5))
.doOnConnected(
conn ->
conn.addHandlerLast(new ReadTimeoutHandler(5 * 60))
.addHandlerLast(new WriteTimeoutHandler(5 * 60)))
.compress(true)
.wiretap(
invokedClass.getCanonicalName(), LogLevel.TRACE, AdvancedByteBufFormat.TEXTUAL);
return new ReactorClientHttpConnector(httpClient);
}
我也尝试尽可能缩小此代码的范围,因此请原谅我的任何结构错误。
我已经针对wiremock中API的本地模拟实例进行了测试,问题仍然存在,因此应该排除服务器配置问题。
有没有可能是我的处理代码太慢,WebClients返回的响应没有及时消耗掉,导致webclient决定关闭连接?
抱歉回复晚了。 reactor 的 github 页面 上提出了一个关于它的问题。 这个问题似乎是“客户端发送[ACK]但没有[ACK,FIN]并保持连接打开。因此,客户端没有关闭连接,稍后又重新使用,导致此错误”。
解决方案应该是设置一个 max-idle-time (该连接在连接池中保持空闲的最长时间)。建议的值是将客户端服务器设置为
keepAliveTimeout
的值,但如果这令人困惑,请尝试一些小操作并进行操作。所以解决方案应该是这样的:
ConnectionProvider connectionProviderWithMaxIdleTime = ConnectionProvider.builder("withMaxIdleTime")
.maxIdleTime(Duration.ofSeconds(120)) //if issue persists reduce this
.build();
HttpClient httpClient = HttpClient.create(connectionProviderWithMaxIdleTime)
.option(ChannelOption.SO_KEEPALIVE, true)
{...the rest of your code...}
希望这有帮助!