WebClient 配置(最小化可重现用例,使用不同的持续时间进行测试,没有效果):
public WebClient createWebClient() {
ConnectionProvider provider = ConnectionProvider.builder("WebClientProvider")
.maxConnections(2)
.maxIdleTime(Duration.ofSeconds(10))
.maxLifeTime(Duration.ofSeconds(20))
.pendingAcquireTimeout(Duration.ofSeconds(10))
.evictInBackground(Duration.ofSeconds(10))
.build();
HttpClient httpClient = HttpClient.create(provider).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
ReactorClientHttpConnector reactorClientHttpConnector = new ReactorClientHttpConnector(httpClient);
return WebClient.builder()
.clientConnector(reactorClientHttpConnector)
.build();
}
客户端的使用,作为公共云服务提供商的 GET:
public Flux<SomeClass> doGetSomething() {
return webClient.get()
.uri(buildUri())
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(byte[].class)
.flatMapMany(byteArray -> Flux.fromArray(byteArrayToModel(byteArray, SomeClass[].class)))
.onErrorResume(e -> {
return Flux.error(new RuntimeException("Failed to call get", e));
});
}
例外:
org.springframework.web.reactive.function.client.WebClientRequestException: Connection prematurely closed BEFORE response; nested exception is reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response
at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoLift] :
reactor.core.publisher.Mono.error(Mono.java:330)
org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.wrapException(ExchangeFunctions.java:141)
Error has been observed at the following site(s):
*__________Mono.error ⇢ at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.wrapException(ExchangeFunctions.java:141)
*__Mono.onErrorResume ⇢ at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.exchange(ExchangeFunctions.java:106)
|_ Mono.map ⇢ at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.exchange(ExchangeFunctions.java:107)
*________Mono.flatMap ⇢ at org.springframework.web.reactive.function.client.ExchangeFilterFunction.lambda$ofResponseProcessor$4(ExchangeFilterFunction.java:96)
*________Mono.flatMap ⇢ at org.springframework.web.reactive.function.client.ExchangeFilterFunction.lambda$ofRequestProcessor$3(ExchangeFilterFunction.java:85)
|_ checkpoint ⇢ Request to GET https://***********************.com/someendpoint*********************** [DefaultWebClient]
|_ Mono.switchIfEmpty ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.lambda$exchange$7(DefaultWebClient.java:433)
*__________Mono.defer ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.exchange(DefaultWebClient.java:430)
|_ Mono.flatMap ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultResponseSpec.bodyToMono(DefaultWebClient.java:540)
|_ Mono.flatMapMany ⇢ at com.mycode(SomeClass.java:57)
Original Stack Trace:
at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:55)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
at io.opentracing.contrib.reactor.TracedSubscriber.lambda$onError$4(TracedSubscriber.java:79)
at io.opentracing.contrib.reactor.TracedSubscriber.withActiveSpan(TracedSubscriber.java:95)
at io.opentracing.contrib.reactor.TracedSubscriber.onError(TracedSubscriber.java:79)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:234)
at io.opentracing.contrib.reactor.TracedSubscriber.lambda$onError$4(TracedSubscriber.java:79)
at io.opentracing.contrib.reactor.TracedSubscriber.withActiveSpan(TracedSubscriber.java:95)
at io.opentracing.contrib.reactor.TracedSubscriber.onError(TracedSubscriber.java:79)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:234)
at io.opentracing.contrib.reactor.TracedSubscriber.lambda$onError$4(TracedSubscriber.java:79)
at io.opentracing.contrib.reactor.TracedSubscriber.withActiveSpan(TracedSubscriber.java:95)
at io.opentracing.contrib.reactor.TracedSubscriber.onError(TracedSubscriber.java:79)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:234)
at io.opentracing.contrib.reactor.TracedSubscriber.lambda$onError$4(TracedSubscriber.java:79)
at io.opentracing.contrib.reactor.TracedSubscriber.withActiveSpan(TracedSubscriber.java:95)
at io.opentracing.contrib.reactor.TracedSubscriber.onError(TracedSubscriber.java:79)
at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)
at io.opentracing.contrib.reactor.TracedSubscriber.lambda$onError$4(TracedSubscriber.java:79)
at io.opentracing.contrib.reactor.TracedSubscriber.withActiveSpan(TracedSubscriber.java:95)
at io.opentracing.contrib.reactor.TracedSubscriber.onError(TracedSubscriber.java:79)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:204)
at io.opentracing.contrib.reactor.TracedSubscriber.lambda$onError$4(TracedSubscriber.java:79)
at io.opentracing.contrib.reactor.TracedSubscriber.withActiveSpan(TracedSubscriber.java:95)
at io.opentracing.contrib.reactor.TracedSubscriber.onError(TracedSubscriber.java:79)
at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225)
at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274)
at io.opentracing.contrib.reactor.TracedSubscriber.lambda$onError$4(TracedSubscriber.java:79)
at io.opentracing.contrib.reactor.TracedSubscriber.withActiveSpan(TracedSubscriber.java:95)
at io.opentracing.contrib.reactor.TracedSubscriber.onError(TracedSubscriber.java:79)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerError(FluxConcatMap.java:309)
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onError(FluxConcatMap.java:875)
at reactor.core.publisher.Operators.error(Operators.java:198)
at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:451)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190)
at io.opentracing.contrib.reactor.TracedSubscriber.lambda$onError$4(TracedSubscriber.java:79)
at io.opentracing.contrib.reactor.TracedSubscriber.withActiveSpan(TracedSubscriber.java:95)
at io.opentracing.contrib.reactor.TracedSubscriber.onError(TracedSubscriber.java:79)
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:194)
at reactor.netty.http.client.HttpClientConnect$HttpObserver.onUncaughtException(HttpClientConnect.java:384)
at reactor.netty.ReactorNetty$CompositeConnectionObserver.onUncaughtException(ReactorNetty.java:670)
at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onUncaughtException(DefaultPooledConnectionProvider.java:202)
at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onUncaughtException(DefaultPooledConnectionProvider.java:450)
at reactor.netty.http.client.HttpClientOperations.onInboundClose(HttpClientOperations.java:294)
at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:73)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelInactive(CombinedChannelDuplexHandler.java:418)
at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:392)
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:357)
at io.netty.handler.codec.http.HttpClientCodec$Decoder.channelInactive(HttpClientCodec.java:326)
at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:221)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:392)
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:357)
at io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1074)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
测试机制: 通过失眠向反应式网络应用程序发出 GET 请求(连续单击“发送”按钮,直到失败),请求间歇性地失败,以随机的时间间隔,没有明显的(对我来说)原因。
有什么想法可以解决这个问题吗?
版本:
org.springframework.boot:spring-boot-starter-reactor-netty:jar:2.6.6
io.projectreactor.netty:reactor-netty-http:jar:1.0.17
io.projectreactor.netty:reactor-netty-core:jar:1.0.17
io.projectreactor:reactor-core:jar:3.4.16
io.netty:netty-codec:jar:4.1.75
org.springframework:spring-webflux:jar:5.3.18
您可以通过为每个请求创建新连接来解决此问题:
HttpClient httpClient = HttpClient.newConnection().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
默认情况下,Netty 会池化连接并在响应标头不包含以下内容时使它们保持活动状态:
Connection: close
WebClient 和处理请求的服务器之间的任何中间网络组件(负载均衡器、代理等)都可以关闭请求之间的连接。如果发生这种情况,您将得到
Connection prematurely closed BEFORE response
例外。
如果您需要连接池,您也可以尝试发送 TCP 保持活动请求,但仍然存在相同连接的两个请求之间连接将被关闭的风险。
请参阅 Netty 文档
HttpClient client = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.option(ChannelOption.SO_KEEPALIVE, true)
// The options below are available only when NIO transport (Java 11) is used
// on Mac or Linux (Java does not currently support these extended options on Windows)
// https://bugs.openjdk.java.net/browse/JDK-8194298
//.option(NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPIDLE), 300)
//.option(NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPINTERVAL), 60)
//.option(NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPCOUNT), 8);
// The options below are available only when Epoll transport is used
.option(EpollChannelOption.TCP_KEEPIDLE, 300)
.option(EpollChannelOption.TCP_KEEPINTVL, 60)
.option(EpollChannelOption.TCP_KEEPCNT, 8);