TcpNioConnection 关闭抛出异常(java.nio.channels.ClosedChannelException)

问题描述 投票:0回答:1

场景是我们需要配置 maxClient,如果达到它,我们必须关闭连接(如果它大于 maxClient)。为此,我们在列表中添加connectionId,如果列表大于最大客户端,则我们关闭TcpNioConnection。但当我们关闭时,它会抛出以下异常。

java.nio.channels.ClosedChannelException:null 在 java.base/java.nio.channels.spi.AbstractSelectableChannel.register(AbstractSelectableChannel.java:222) 在org.springframework.integration.ip.tcp.connection.TcpNioServerConnectionFactory.createConnectionForAcceptedChannel(TcpNioServerConnectionFactory.java:260) 在org.springframework.integration.ip.tcp.connection.TcpNioServerConnectionFactory.doAccept(TcpNioServerConnectionFactory.java:229) 在org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory.keyAcceptable(AbstractConnectionFactory.java:776) 在 org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory.handleKey(AbstractConnectionFactory.java:725) 在org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory.processNioSelections(AbstractConnectionFactory.java:672) 在 org.springframework.integration.ip.tcp.connection.TcpNioServerConnectionFactory.doSelect(TcpNioServerConnectionFactory.java:194) 在org.springframework.integration.ip.tcp.connection.TcpNioServerConnectionFactory.run(TcpNioServerConnectionFactory.java:155) 在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 在 java.base/java.lang.Thread.run(Thread.java:833)

下面是代码

private final Logger log = LoggerFactory.getLogger(TcpConnectionApplicationListener.class);



List<String> connectionIdList = new ArrayList<String>();

@Override
public void onApplicationEvent(TcpConnectionEvent event) {
    
    
    String connectionId = event.getConnectionId();
    String ipAddressPort = null;
    if(Objects.nonNull(connectionId)) {
        ipAddressPort = connectionId.substring(0,connectionId.lastIndexOf(":"));
        ipAddressPort = ipAddressPort.substring(0, ipAddressPort.lastIndexOf(":"));
    }
    log.info("ipAddressPort >>> "+ipAddressPort);
    
    if (event.getSource() instanceof TcpNioConnection) {
        log.info("inside connection factory >>> ");
        TcpNioConnection nioConnection = (TcpNioConnection) event.getSource();
        if(connectionIdList.size() > 1) {
            nioConnection.close();
            log.info("isClosed >>> "+ nioConnection.getSocketInfo().isClosed());
            return;
        }
    }
    
    
    if (event instanceof TcpConnectionOpenEvent) {
        
        connectionIdList.add(event.getConnectionId());
        log.info("TcpConnectionOpenEvent connection id = {}",connectionIdList.size());
        
        log.info("TCP Open connection event with id={}", event.getConnectionId());
       
       
    } else if (event instanceof TcpConnectionCloseEvent) {
        
        connectionIdList.remove(event.getConnectionId());
        log.info("TcpConnectionCloseEvent connection id = {}",connectionIdList.size());
        
        log.info("TCP Close connection event with id={}", event.getConnectionId());
       
    }
    
}

下面是集成流程的代码

IntegrationFlows.from(Tcp.inboundGateway(Tcp.nioServer(port)
          .soReceiveBufferSize(512)
          .deserializer(new MessageDeserializer())
          .serializer(new MessageSerializer())
         )) 
            .log()
            .transform(Transformers.objectToString())
            .handle("outboundService", "processAndSendMessage")
            .get();
}

您能否让我知道如何解决此问题,或者是否有其他方法可以在 Tcp.nioServer 中配置最大客户端数,以便仅存在许多连接。

spring-boot spring-integration
1个回答
0
投票

我建议不要关闭连接,而是抛出异常。 这样

TcpNioServerConnectionFactory
中的逻辑就会是这样的:

private TcpNioConnection createTcpNioConnection(SocketChannel socketChannel) {
    try {
        TcpNioConnection connection = this.tcpNioConnectionSupport.createNewConnection(socketChannel, true,
                isLookupHost(), getApplicationEventPublisher(), getComponentName());
        connection.setUsingDirectBuffers(this.usingDirectBuffers);
        TcpConnectionSupport wrappedConnection = wrapConnection(connection);
        if (!wrappedConnection.equals(connection)) {
            connection.setSenders(getSenders());
        }
        initializeConnection(wrappedConnection, socketChannel.socket());
        wrappedConnection.publishConnectionOpenEvent();
        return connection;
    }
    catch (Exception ex) {
        logger.error(ex, "Failed to establish new incoming connection");
        return null;
    }
}

您也可以通过自定义

TcpNioConnectionSupport
而不是事件侦听器来执行此操作。

© www.soinside.com 2019 - 2024. All rights reserved.