TCP 连接在闲置 300 秒后关闭

问题描述 投票:0回答:1

我已经为依赖于 TCP/IP 协议的面向流的套接字实现了 TCPClient。我已经使用了 Spring Integration。

我的期望是创建 9 个连接的池。流量将仅由这 9 个 TCP 连接处理。但我意识到连接在 300 秒后就被关闭了。 (日志的最后三行)这会导致为新请求建立新的 TCP 连接。客户(我们的 TCP 服务器)说他们从不关闭连接。客户报告说我的应用程序正在尝试建立比分配数量更多的连接(即 9)。

这是配置。

    @Bean
    public AbstractClientConnectionFactory emirClientConnectionFactory() {
        TcpNetClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory("censored-host", 9999);
        tcpNetClientConnectionFactory.setApplicationEventPublisher(applicationEventPublisher);
        tcpNetClientConnectionFactory.setDeserializer(new EmirCustomStxHeaderLengthSerializer());
        tcpNetClientConnectionFactory.setSerializer(new EmirCustomStxHeaderLengthSerializer());
        tcpNetClientConnectionFactory.setSingleUse(false);
        tcpNetClientConnectionFactory.setSoKeepAlive(true);
        TcpSocketSupport emirTcpSocketSupport = new emirTcpSocketSupport();
        tcpNetClientConnectionFactory.setTcpSocketSupport(emirTcpSocketSupport);
        return tcpNetClientConnectionFactory;
    }


    @Bean
    public AbstractClientConnectionFactory emirTcpCachedClientConnectionFactory() {
        CachingClientConnectionFactory cachingConnFactory = new CachingClientConnectionFactory(emirClientConnectionFactory(), 9);
        cachingConnFactory.setSingleUse(false);
        cachingConnFactory.setLeaveOpen(true);
        cachingConnFactory.setSoKeepAlive(true);
        this.cachingClientConnectionFactory = cachingConnFactory;
        return cachingConnFactory;
    }

    @Bean
    public MessageChannel emirOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    public RequestHandlerRetryAdvice retryAdvice() {
        RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
        RetryTemplate retryTemplate = new RetryTemplate();
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(50);
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
        retryTemplate.setRetryPolicy(retryPolicy);
        retryAdvice.setRetryTemplate(retryTemplate);
        return retryAdvice;
    }

    @Bean
    @ServiceActivator(inputChannel = "emirOutboundChannel")
    public MessageHandler emirOutbound(AbstractClientConnectionFactory emirTcpCachedClientConnectionFactory) {
        TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
        List<Advice> list = new ArrayList<>();
        list.add(retryAdvice());
        tcpOutboundGateway.setAdviceChain(list);
        tcpOutboundGateway.setRemoteTimeout(16000);
        tcpOutboundGateway.setRequestTimeout(16000);
        tcpOutboundGateway.setSendTimeout(16000);
        tcpOutboundGateway.setConnectionFactory(emirTcpCachedClientConnectionFactory);
        return tcpOutboundGateway;
    }

}

我还在

EmirTcpSocketSupport.java
中配置了KEEP-ALIVE。我怀疑连接因 AWS Nat 网关而关闭(约 350 秒)。所以,我把 KEEPIDLE 设置为 290 秒。

    @Override
    public void postProcessSocket(Socket socket) {
        if (this.sslVerifyHost && socket instanceof SSLSocket) {
            SSLSocket sslSocket = (SSLSocket) socket;
            SSLParameters sslParameters = sslSocket.getSSLParameters();
            sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
            sslSocket.setSSLParameters(sslParameters);
        }
        try {
            socket.setOption(ExtendedSocketOptions.TCP_KEEPIDLE, 290);
        } catch (IOException ex) {
            throw new UncheckedIOException(ex);
        }
    }

输出控制台: TCP 套接字服务器在主机上运行:审查主机和端口:9999

2023-08-16 12:53:44.150 my-project[Thread Id = 486] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:158 - New connection censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
2023-08-16 12:53:44.150 my-project[Thread Id = 486] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory:313 - emirClientConnectionFactory: Added new connection: censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4

2023-08-16 12:53:44.152 my-project[Thread Id = 486] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Added pending reply Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
2023-08-16 12:53:44.152 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:193 - censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 Reading...
2023-08-16 12:53:44.153 my-project[Thread Id = 486] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:125 - censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 Message sent GenericMessage [payload=byte[482], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1f32b135, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1f32b135, id=58330247-fe56-1322-c94e-248e1acf6338, timestamp=1692190424136}]
2023-08-16 12:53:44.461 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:217 - Message received GenericMessage [payload=byte[175], headers={ip_tcp_remotePort=9999, ip_connectionId=censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_localInetAddress=/xx.x.x.xx, ip_address=xxx.xx.xxx.xx, id=c5ce21d9-cc1b-db8f-1a54-b6fe731d3281, ip_hostname=censored-host, timestamp=1692190424461}]
2023-08-16 12:53:44.461 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:348 - censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 Waiting for listener registration
2023-08-16 12:53:44.461 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:352 - censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 Listener registeredemirDoDeserialize message {}
2023-08-16 12:53:44.461 my-project[Thread Id = 486] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Response GenericMessage [payload=byte[175], headers={ip_tcp_remotePort=9999, ip_connectionId=Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_actualConnectionId=censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_localInetAddress=/xx.x.x.xx, ip_address=xxx.xx.xxx.xx, id=68f25612-5a89-d6ce-0898-8f44a193509d, ip_hostname=censored-host, timestamp=1692190424461}]
2023-08-16 12:53:44.461 my-project[Thread Id = 486] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Removed pending reply Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4

2023-08-16 12:53:44.461 my-project[Thread Id = 528] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Added pending reply Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
2023-08-16 12:53:44.462 my-project[Thread Id = 528] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:125 - censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 Message sent GenericMessage [payload=byte[482], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2255db4, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2255db4, id=918bfab3-bc85-7393-0c44-3d925fd2ef2f, timestamp=1692190424288}]ResponseDURSEG:null
2023-08-16 12:53:44.502 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:217 - Message received GenericMessage [payload=byte[130], headers={ip_tcp_remotePort=9999, ip_connectionId=censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_localInetAddress=/xx.x.x.xx, ip_address=xxx.xx.xxx.xx, id=5aba1a68-3f8c-079e-64d7-b0c967f3804a, ip_hostname=censored-host, timestamp=1692190424502}]
2023-08-16 12:53:44.502 my-project[Thread Id = 528] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Response GenericMessage [payload=byte[130], headers={ip_tcp_remotePort=9999, ip_connectionId=Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_actualConnectionId=censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_localInetAddress=/xx.x.x.xx, ip_address=xxx.xx.xxx.xx, id=018f8fe5-23c4-fe3a-246b-5d205e429898, ip_hostname=censored-host, timestamp=1692190424502}]
2023-08-16 12:53:44.502 my-project[Thread Id = 528] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Removed pending reply Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4

2023-08-16 12:53:44.502 my-project[Thread Id = 532] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Added pending reply Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
2023-08-16 12:53:44.502 my-project[Thread Id = 532] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:125 - censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 Message sent GenericMessage [payload=byte[482], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@6d58f0b5, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@6d58f0b5, id=3024c123-a418-028b-ea59-99f1c2f3c572, timestamp=1692190424293}]
2023-08-16 12:53:44.541 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:217 - Message received GenericMessage [payload=byte[130], headers={ip_tcp_remotePort=9999, ip_connectionId=censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_localInetAddress=/xx.x.x.xx, ip_address=xxx.xx.xxx.xx, id=2c8b1c8c-0ef3-c533-4f4a-47abad800aa8, ip_hostname=censored-host, timestamp=1692190424541}]emirDoDeserialize message {}
2023-08-16 12:53:44.541 my-project[Thread Id = 532] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Response GenericMessage [payload=byte[130], headers={ip_tcp_remotePort=9999, ip_connectionId=Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_actualConnectionId=censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_localInetAddress=/xx.x.x.xx, ip_address=xxx.xx.xxx.xx, id=21cfac2a-cd3b-b0db-5dff-8f70e7fa258c, ip_hostname=censored-host, timestamp=1692190424541}]
2023-08-16 12:53:44.541 my-project[Thread Id = 532] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Removed pending reply Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4

2023-08-16 12:58:44.543 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory$CachedConnection:413 - Connection Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 has already been released
2023-08-16 13:04:00.661 my-project[Thread Id = 562] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory:313 - emirClientConnectionFactory: Removed closed connection: censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4

我的问题是,即使保持活动时间设置为 290 秒,为什么连接会在 300 秒时关闭? (随后,连接将从缓存中删除,这使我的应用程序为请求建立新的 TCP 连接)

spring-integration spring-integration-ip
1个回答
0
投票

Removed closed connection
调试消息从
AbstractConnectionFactory.removeClosedConnectionsAndReturnOpenConnectionIds()
发出。 该方法的调用堆栈如下:

harvestClosedConnections()
<-
TcpNetClientConnectionFactory.buildNewConnection()
<-
doObtain(boolean singleUse)
<-
obtainNewConnection()
.

逻辑是这样的:

        if (!singleUse) {
            // Another write lock holder might have created a new one by now.
            connection = obtainSharedConnection();
            if (connection != null && connection.isOpen()) {
                return connection;
            }
        }
        return doObtain(singleUse);

因此,即使您使用

singleUse = false
,该
sharedConnection
也会由于某种原因被其套接字关闭:

public boolean isOpen() {
    return !this.socket.isClosed();
}

如果您的服务器确实定期关闭这些套接字,我怀疑我们可以从客户端角度做任何事情。

© www.soinside.com 2019 - 2024. All rights reserved.