Spring 反应式连接在响应反应器 netty WebClient HttpClient 之前提前关闭

问题描述 投票:0回答:1

WebClient 配置(最小化可重现用例,使用不同的持续时间进行测试,没有效果):

    public WebClient createWebClient() {
        ConnectionProvider provider = ConnectionProvider.builder("WebClientProvider")
                .maxConnections(2)
                .maxIdleTime(Duration.ofSeconds(10))
                .maxLifeTime(Duration.ofSeconds(20))
                .pendingAcquireTimeout(Duration.ofSeconds(10))
                .evictInBackground(Duration.ofSeconds(10))
                .build();

        HttpClient httpClient = HttpClient.create(provider).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);

        ReactorClientHttpConnector reactorClientHttpConnector = new ReactorClientHttpConnector(httpClient);

        return WebClient.builder()
            .clientConnector(reactorClientHttpConnector)
            .build();
    }

客户端的使用,作为公共云服务提供商的 GET:

    public Flux<SomeClass> doGetSomething() {
        return webClient.get()
                .uri(buildUri())
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(byte[].class)
                .flatMapMany(byteArray -> Flux.fromArray(byteArrayToModel(byteArray, SomeClass[].class)))
                .onErrorResume(e -> {
                    return Flux.error(new RuntimeException("Failed to call get", e));
                });
    }

例外:

org.springframework.web.reactive.function.client.WebClientRequestException: Connection prematurely closed BEFORE response; nested exception is reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response
    at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoLift] :
    reactor.core.publisher.Mono.error(Mono.java:330)
    org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.wrapException(ExchangeFunctions.java:141)
Error has been observed at the following site(s):
    *__________Mono.error ⇢ at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.wrapException(ExchangeFunctions.java:141)
    *__Mono.onErrorResume ⇢ at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.exchange(ExchangeFunctions.java:106)
    |_           Mono.map ⇢ at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.exchange(ExchangeFunctions.java:107)
    *________Mono.flatMap ⇢ at org.springframework.web.reactive.function.client.ExchangeFilterFunction.lambda$ofResponseProcessor$4(ExchangeFilterFunction.java:96)
    *________Mono.flatMap ⇢ at org.springframework.web.reactive.function.client.ExchangeFilterFunction.lambda$ofRequestProcessor$3(ExchangeFilterFunction.java:85)
    |_         checkpoint ⇢ Request to GET https://***********************.com/someendpoint*********************** [DefaultWebClient]
    |_ Mono.switchIfEmpty ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.lambda$exchange$7(DefaultWebClient.java:433)
    *__________Mono.defer ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.exchange(DefaultWebClient.java:430)
    |_       Mono.flatMap ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultResponseSpec.bodyToMono(DefaultWebClient.java:540)
    |_   Mono.flatMapMany ⇢ at com.mycode(SomeClass.java:57)
Original Stack Trace:
        at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
        at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:55)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
        at io.opentracing.contrib.reactor.TracedSubscriber.lambda$onError$4(TracedSubscriber.java:79)
        at io.opentracing.contrib.reactor.TracedSubscriber.withActiveSpan(TracedSubscriber.java:95)
        at io.opentracing.contrib.reactor.TracedSubscriber.onError(TracedSubscriber.java:79)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:234)
        at io.opentracing.contrib.reactor.TracedSubscriber.lambda$onError$4(TracedSubscriber.java:79)
        at io.opentracing.contrib.reactor.TracedSubscriber.withActiveSpan(TracedSubscriber.java:95)
        at io.opentracing.contrib.reactor.TracedSubscriber.onError(TracedSubscriber.java:79)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:234)
        at io.opentracing.contrib.reactor.TracedSubscriber.lambda$onError$4(TracedSubscriber.java:79)
        at io.opentracing.contrib.reactor.TracedSubscriber.withActiveSpan(TracedSubscriber.java:95)
        at io.opentracing.contrib.reactor.TracedSubscriber.onError(TracedSubscriber.java:79)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:234)
        at io.opentracing.contrib.reactor.TracedSubscriber.lambda$onError$4(TracedSubscriber.java:79)
        at io.opentracing.contrib.reactor.TracedSubscriber.withActiveSpan(TracedSubscriber.java:95)
        at io.opentracing.contrib.reactor.TracedSubscriber.onError(TracedSubscriber.java:79)
        at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)
        at io.opentracing.contrib.reactor.TracedSubscriber.lambda$onError$4(TracedSubscriber.java:79)
        at io.opentracing.contrib.reactor.TracedSubscriber.withActiveSpan(TracedSubscriber.java:95)
        at io.opentracing.contrib.reactor.TracedSubscriber.onError(TracedSubscriber.java:79)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:204)
        at io.opentracing.contrib.reactor.TracedSubscriber.lambda$onError$4(TracedSubscriber.java:79)
        at io.opentracing.contrib.reactor.TracedSubscriber.withActiveSpan(TracedSubscriber.java:95)
        at io.opentracing.contrib.reactor.TracedSubscriber.onError(TracedSubscriber.java:79)
        at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
        at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225)
        at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274)
        at io.opentracing.contrib.reactor.TracedSubscriber.lambda$onError$4(TracedSubscriber.java:79)
        at io.opentracing.contrib.reactor.TracedSubscriber.withActiveSpan(TracedSubscriber.java:95)
        at io.opentracing.contrib.reactor.TracedSubscriber.onError(TracedSubscriber.java:79)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerError(FluxConcatMap.java:309)
        at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onError(FluxConcatMap.java:875)
        at reactor.core.publisher.Operators.error(Operators.java:198)
        at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:451)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251)
        at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
        at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
        at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
        at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
        at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190)
        at io.opentracing.contrib.reactor.TracedSubscriber.lambda$onError$4(TracedSubscriber.java:79)
        at io.opentracing.contrib.reactor.TracedSubscriber.withActiveSpan(TracedSubscriber.java:95)
        at io.opentracing.contrib.reactor.TracedSubscriber.onError(TracedSubscriber.java:79)
        at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:194)
        at reactor.netty.http.client.HttpClientConnect$HttpObserver.onUncaughtException(HttpClientConnect.java:384)
        at reactor.netty.ReactorNetty$CompositeConnectionObserver.onUncaughtException(ReactorNetty.java:670)
        at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onUncaughtException(DefaultPooledConnectionProvider.java:202)
        at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onUncaughtException(DefaultPooledConnectionProvider.java:450)
        at reactor.netty.http.client.HttpClientOperations.onInboundClose(HttpClientOperations.java:294)
        at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:73)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelInactive(CombinedChannelDuplexHandler.java:418)
        at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:392)
        at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:357)
        at io.netty.handler.codec.http.HttpClientCodec$Decoder.channelInactive(HttpClientCodec.java:326)
        at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:221)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
        at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:392)
        at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:357)
        at io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1074)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
        at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:833)

测试机制: 通过失眠向反应式网络应用程序发出 GET 请求(连续单击“发送”按钮,直到失败),请求间歇性地失败,以随机的时间间隔,没有明显的(对我来说)原因。

有什么想法可以解决这个问题吗?

版本:

org.springframework.boot:spring-boot-starter-reactor-netty:jar:2.6.6
io.projectreactor.netty:reactor-netty-http:jar:1.0.17
io.projectreactor.netty:reactor-netty-core:jar:1.0.17
io.projectreactor:reactor-core:jar:3.4.16
io.netty:netty-codec:jar:4.1.75
org.springframework:spring-webflux:jar:5.3.18

附注 调试内部库工作流程,并到达这里,在reactor netty http中:

java spring spring-webflux spring-webclient reactor-netty
1个回答
0
投票

您可以通过为每个请求创建新连接来解决此问题:

HttpClient httpClient = HttpClient.newConnection().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);

默认情况下,Netty 会池化连接并在响应标头不包含以下内容时使它们保持活动状态:

Connection: close

WebClient 和处理请求的服务器之间的任何中间网络组件(负载均衡器、代理等)都可以关闭请求之间的连接。如果发生这种情况,您将得到

Connection prematurely closed BEFORE response

例外。

如果您需要连接池,您也可以尝试发送 TCP 保持活动请求,但仍然存在相同连接的两个请求之间连接将被关闭的风险。

请参阅 Netty 文档

HttpClient client = HttpClient.create()
    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) 
    .option(ChannelOption.SO_KEEPALIVE, true)            
    // The options below are available only when NIO transport (Java 11) is used
    // on Mac or Linux (Java does not currently support these extended options on Windows)
    // https://bugs.openjdk.java.net/browse/JDK-8194298
    //.option(NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPIDLE), 300)
    //.option(NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPINTERVAL), 60)
    //.option(NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPCOUNT), 8);
    // The options below are available only when Epoll transport is used
    .option(EpollChannelOption.TCP_KEEPIDLE, 300)        
    .option(EpollChannelOption.TCP_KEEPINTVL, 60)        
    .option(EpollChannelOption.TCP_KEEPCNT, 8);
© www.soinside.com 2019 - 2024. All rights reserved.