我已经为依赖于 TCP/IP 协议的面向流的套接字实现了 TCPClient。我已经使用了 Spring Integration。
我的期望是创建 9 个连接的池。流量将仅由这 9 个 TCP 连接处理。但我意识到连接在 300 秒后就被关闭了。 (日志的最后三行)这会导致为新请求建立新的 TCP 连接。客户(我们的 TCP 服务器)说他们从不关闭连接。客户报告说我的应用程序正在尝试建立比分配数量更多的连接(即 9)。
这是配置。
@Bean
public AbstractClientConnectionFactory emirClientConnectionFactory() {
TcpNetClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory("censored-host", 9999);
tcpNetClientConnectionFactory.setApplicationEventPublisher(applicationEventPublisher);
tcpNetClientConnectionFactory.setDeserializer(new EmirCustomStxHeaderLengthSerializer());
tcpNetClientConnectionFactory.setSerializer(new EmirCustomStxHeaderLengthSerializer());
tcpNetClientConnectionFactory.setSingleUse(false);
tcpNetClientConnectionFactory.setSoKeepAlive(true);
TcpSocketSupport emirTcpSocketSupport = new emirTcpSocketSupport();
tcpNetClientConnectionFactory.setTcpSocketSupport(emirTcpSocketSupport);
return tcpNetClientConnectionFactory;
}
@Bean
public AbstractClientConnectionFactory emirTcpCachedClientConnectionFactory() {
CachingClientConnectionFactory cachingConnFactory = new CachingClientConnectionFactory(emirClientConnectionFactory(), 9);
cachingConnFactory.setSingleUse(false);
cachingConnFactory.setLeaveOpen(true);
cachingConnFactory.setSoKeepAlive(true);
this.cachingClientConnectionFactory = cachingConnFactory;
return cachingConnFactory;
}
@Bean
public MessageChannel emirOutboundChannel() {
return new DirectChannel();
}
@Bean
public RequestHandlerRetryAdvice retryAdvice() {
RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(50);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
retryAdvice.setRetryTemplate(retryTemplate);
return retryAdvice;
}
@Bean
@ServiceActivator(inputChannel = "emirOutboundChannel")
public MessageHandler emirOutbound(AbstractClientConnectionFactory emirTcpCachedClientConnectionFactory) {
TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
List<Advice> list = new ArrayList<>();
list.add(retryAdvice());
tcpOutboundGateway.setAdviceChain(list);
tcpOutboundGateway.setRemoteTimeout(16000);
tcpOutboundGateway.setRequestTimeout(16000);
tcpOutboundGateway.setSendTimeout(16000);
tcpOutboundGateway.setConnectionFactory(emirTcpCachedClientConnectionFactory);
return tcpOutboundGateway;
}
}
我还在
EmirTcpSocketSupport.java
中配置了KEEP-ALIVE。我怀疑连接因 AWS Nat 网关而关闭(约 350 秒)。所以,我把 KEEPIDLE 设置为 290 秒。
@Override
public void postProcessSocket(Socket socket) {
if (this.sslVerifyHost && socket instanceof SSLSocket) {
SSLSocket sslSocket = (SSLSocket) socket;
SSLParameters sslParameters = sslSocket.getSSLParameters();
sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
sslSocket.setSSLParameters(sslParameters);
}
try {
socket.setOption(ExtendedSocketOptions.TCP_KEEPIDLE, 290);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
输出控制台: TCP 套接字服务器在主机上运行:审查主机和端口:9999
2023-08-16 12:53:44.150 my-project[Thread Id = 486] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:158 - New connection censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
2023-08-16 12:53:44.150 my-project[Thread Id = 486] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory:313 - emirClientConnectionFactory: Added new connection: censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
2023-08-16 12:53:44.152 my-project[Thread Id = 486] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Added pending reply Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
2023-08-16 12:53:44.152 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:193 - censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 Reading...
2023-08-16 12:53:44.153 my-project[Thread Id = 486] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:125 - censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 Message sent GenericMessage [payload=byte[482], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1f32b135, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1f32b135, id=58330247-fe56-1322-c94e-248e1acf6338, timestamp=1692190424136}]
2023-08-16 12:53:44.461 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:217 - Message received GenericMessage [payload=byte[175], headers={ip_tcp_remotePort=9999, ip_connectionId=censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_localInetAddress=/xx.x.x.xx, ip_address=xxx.xx.xxx.xx, id=c5ce21d9-cc1b-db8f-1a54-b6fe731d3281, ip_hostname=censored-host, timestamp=1692190424461}]
2023-08-16 12:53:44.461 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:348 - censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 Waiting for listener registration
2023-08-16 12:53:44.461 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:352 - censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 Listener registeredemirDoDeserialize message {}
2023-08-16 12:53:44.461 my-project[Thread Id = 486] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Response GenericMessage [payload=byte[175], headers={ip_tcp_remotePort=9999, ip_connectionId=Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_actualConnectionId=censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_localInetAddress=/xx.x.x.xx, ip_address=xxx.xx.xxx.xx, id=68f25612-5a89-d6ce-0898-8f44a193509d, ip_hostname=censored-host, timestamp=1692190424461}]
2023-08-16 12:53:44.461 my-project[Thread Id = 486] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Removed pending reply Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
2023-08-16 12:53:44.461 my-project[Thread Id = 528] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Added pending reply Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
2023-08-16 12:53:44.462 my-project[Thread Id = 528] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:125 - censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 Message sent GenericMessage [payload=byte[482], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2255db4, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2255db4, id=918bfab3-bc85-7393-0c44-3d925fd2ef2f, timestamp=1692190424288}]ResponseDURSEG:null
2023-08-16 12:53:44.502 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:217 - Message received GenericMessage [payload=byte[130], headers={ip_tcp_remotePort=9999, ip_connectionId=censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_localInetAddress=/xx.x.x.xx, ip_address=xxx.xx.xxx.xx, id=5aba1a68-3f8c-079e-64d7-b0c967f3804a, ip_hostname=censored-host, timestamp=1692190424502}]
2023-08-16 12:53:44.502 my-project[Thread Id = 528] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Response GenericMessage [payload=byte[130], headers={ip_tcp_remotePort=9999, ip_connectionId=Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_actualConnectionId=censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_localInetAddress=/xx.x.x.xx, ip_address=xxx.xx.xxx.xx, id=018f8fe5-23c4-fe3a-246b-5d205e429898, ip_hostname=censored-host, timestamp=1692190424502}]
2023-08-16 12:53:44.502 my-project[Thread Id = 528] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Removed pending reply Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
2023-08-16 12:53:44.502 my-project[Thread Id = 532] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Added pending reply Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
2023-08-16 12:53:44.502 my-project[Thread Id = 532] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:125 - censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 Message sent GenericMessage [payload=byte[482], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@6d58f0b5, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@6d58f0b5, id=3024c123-a418-028b-ea59-99f1c2f3c572, timestamp=1692190424293}]
2023-08-16 12:53:44.541 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:217 - Message received GenericMessage [payload=byte[130], headers={ip_tcp_remotePort=9999, ip_connectionId=censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_localInetAddress=/xx.x.x.xx, ip_address=xxx.xx.xxx.xx, id=2c8b1c8c-0ef3-c533-4f4a-47abad800aa8, ip_hostname=censored-host, timestamp=1692190424541}]emirDoDeserialize message {}
2023-08-16 12:53:44.541 my-project[Thread Id = 532] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Response GenericMessage [payload=byte[130], headers={ip_tcp_remotePort=9999, ip_connectionId=Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_actualConnectionId=censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_localInetAddress=/xx.x.x.xx, ip_address=xxx.xx.xxx.xx, id=21cfac2a-cd3b-b0db-5dff-8f70e7fa258c, ip_hostname=censored-host, timestamp=1692190424541}]
2023-08-16 12:53:44.541 my-project[Thread Id = 532] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Removed pending reply Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
2023-08-16 12:58:44.543 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory$CachedConnection:413 - Connection Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 has already been released
2023-08-16 13:04:00.661 my-project[Thread Id = 562] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory:313 - emirClientConnectionFactory: Removed closed connection: censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
我的问题是,即使保持活动时间设置为 290 秒,为什么连接会在 300 秒时关闭? (随后,连接将从缓存中删除,这使我的应用程序为请求建立新的 TCP 连接)
Removed closed connection
调试消息从 AbstractConnectionFactory.removeClosedConnectionsAndReturnOpenConnectionIds()
发出。
该方法的调用堆栈如下:
harvestClosedConnections()
<- TcpNetClientConnectionFactory.buildNewConnection()
<- doObtain(boolean singleUse)
<- obtainNewConnection()
.
逻辑是这样的:
if (!singleUse) {
// Another write lock holder might have created a new one by now.
connection = obtainSharedConnection();
if (connection != null && connection.isOpen()) {
return connection;
}
}
return doObtain(singleUse);
因此,即使您使用
singleUse = false
,该 sharedConnection
也会由于某种原因被其套接字关闭:
public boolean isOpen() {
return !this.socket.isClosed();
}
如果您的服务器确实定期关闭这些套接字,我怀疑我们可以从客户端角度做任何事情。