处理多个客户端(使用线程的最佳方法?)

问题描述 投票:0回答:2

所以我的问题在这里。现在,如果我的服务器有20个以上的客户端,则它也有20个线程,而带有ryzen CPU的桌面在30个线程时的使用率将达到100%。现在,我想通过一台服务器处理大量客户端,但是CPU只是被过度使用了。我的智慧很简单,但必须有更好的方法。因为到目前为止,我已经看到很多好的Java服务器。我不知道我做错了什么。在下面的文章中,我将分享我的代码,原则上我该如何做。

while(this.isRunning()) {
ServerSocket server = new ServerSocket(8081);
Socket s = server.accept();
new Thread(new WorkerRunnable(s)).start();
//now here if e.g. over 25 users connect there are 25 threads. CPU is at 100%. Is there a better way to handle this?

可运行的工人正在标识客户端。之后,他们将进入聊天室。就像是群聊,例如

编辑:我未完成的代码的相关部分仍然很清晰

private boolean state;
private ServerSocket socket;

@Override
public void run() {
    while(this.isRunning()==true) {
        try {
            if(this.socket==null) this.socket = new ServerSocket(this.getPort());
            Socket connection = this.socket.accept();




            IntroductionSession session = new IntroductionSession(this, connection);
            new Thread(session).start();
            //register timeout task for 3 secs and handle it async



            System.out.println(ManagementFactory.getThreadMXBean().getThreadCount());
            //this.handleIncomingConnection(connection);
        } catch(Exception e) {
            e.printStackTrace();
            //System.exit(1);
        }
    }
}

私有类IntroductionSession实现了Runnable {private boolean alive = true;

    private BaseServer server;
    private Socket socket;
    private boolean introduced = false;

    public IntroductionSession(BaseServer server, Socket socket) {
        this.server = server;
        this.socket = socket;
    }

    private void interrupt() {
        System.out.println("Not mroe alive");
        this.alive = false;
    }

    private void killConnection() {
        this.killConnection("no_reason");
    }

    private void killConnection(String reason) {
        try {
            if(this.from_client!=null) this.from_client.close();
            if(this.to_client!=null) this.to_client.close();
            this.socket.close();

            switch(reason) {
                case "didnt_introduce":
                    System.out.println("Kicked connection, cause it didn't introduce itself");
                break;
                case "unknown_type":
                    System.out.println("Kicked unknown connection-type.");
                break;
                case "no_reason":
                default:
                    //ignore
                break;
            }
        } catch (IOException e) {
            switch(reason) {
                case "didnt_introduce":
                    System.out.println("Error at kicking connection, which didn't introduce itself");
                break;
                case "unknown_type":
                    System.out.println("Error at kicking unknown connection-type.");
                break;
                case "no_reason":
                default:
                    System.out.println("Error occured at kicking connection");
                break;
            }

            e.printStackTrace();

        }
    }

    private ObjectInputStream from_client;
    private ObjectOutputStream to_client;

    @Override
    public void run() {
        while(this.alive==true) {
            try {
                if(this.to_client==null) {
                    this.to_client = new ObjectOutputStream(this.socket.getOutputStream());
                    //this.to_client.flush();
                }
                if(this.from_client==null) this.from_client = new ObjectInputStream(this.socket.getInputStream());
                //Time runs now, if socket is inactive its getting kicked
                new Timer().schedule(new java.util.TimerTask() {
                        @Override
                        public void run() {
                            if(IntroductionSession.this.introduced==false) {
                                IntroductionSession.this.killConnection("didnt_introduce");
                                Thread.currentThread().interrupt();
                                IntroductionSession.this.interrupt();
                            }
                        }
                    }, 5000
                );

                Object obj = this.from_client.readObject();
                while(obj!=null) {
                    if(obj instanceof IntroductionPacket) {
                        IntroductionPacket pk = (IntroductionPacket) obj;
                        introduced = true;

                        if(isCompatible(pk)==false) {
                            try {
                                this.to_client.writeObject(new DifferentVersionKickPacket(BaseServer.version));
                                this.to_client.close();
                                this.from_client.close();
                                IntroductionSession.this.socket.close();
                                System.out.println("Kicked socket, which uses another version.");
                            } catch(Exception e) {
                                Thread.currentThread().interrupt();
                                //ignore
                                System.out.println("Error at kicking incompatible socket.");
                                e.printStackTrace();
                            }
                        } else {
                            this.server.handleIncomingConnection(this.socket, this.from_client, this.to_client);
                        }

                        Thread.currentThread().interrupt();
                    }
                }
            } catch(StreamCorruptedException e) {
                //unknown client-type = kick
                this.killConnection("unknown_type");
            } catch (IOException|ClassNotFoundException e) {
                e.printStackTrace();
                this.killConnection("no_reason");
            }/* catch(SocketException e) {

            }*/
        }
        Thread.currentThread().interrupt();
    }
}

扩展类,它是实际的服务器:

@Override
public void handleIncomingConnection(Socket connection, ObjectInputStream from_client, ObjectOutputStream to_client) {
    new AuthenticationSession(connection, from_client, to_client).run();
}

private class AuthenticationSession implements Runnable {
    private Socket socket;
    private ObjectInputStream from_client;
    private ObjectOutputStream to_client;

    public AuthenticationSession(Socket socket, ObjectInputStream from_client, ObjectOutputStream to_client) {
        this.socket = socket;
        this.to_client = to_client;
        this.from_client = from_client;
    }
    //TODO: Implement app id for access tokens
    @Override
    public void run() {
        try {
            while(this.socket.isConnected()==true) {
                /*ObjectOutputStream to_client = new ObjectOutputStream(socket.getOutputStream()); //maybe cause problems, do it later if it does
                ObjectInputStream from_client = new ObjectInputStream(socket.getInputStream());*/

                Object object = from_client.readObject();

                while(object!=null) {
                    if(object instanceof RegisterPacket) {
                        RegisterPacket regPacket = (RegisterPacket) object;

                        System.out.println("Username:" + regPacket + ", password: " + regPacket.password + ", APP-ID: " + regPacket.appId);
                    } else {
                        System.out.println("IP " + this.socket.getInetAddress().getHostAddress() + ":" + this.socket.getPort() + " tried to send an unknown packet.");
                        this.socket.close();
                    }
                }
            }
        }/* catch(EOFException eofe) {
            //unexpected disconnect

        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }*/
        catch(Exception e) {
            e.printStackTrace();
            System.exit(1);
        }

        /*catch(Exception e) {
            //e.printStackTrace();

            Thread.currentThread().interrupt();
        }*/
    }

}

[请不要看看它非常糟糕的格式和我为修复它所做的工作,尽管如此,这些任务也不会消失。

java multithreading sockets networking serversocket
2个回答
0
投票

由于您正在执行聊天应用程序,因此您需要考虑进行单线程事件循环。

您可以保留字符串(客户端ID)和套接字(客户端套接字)的映射。

Map<String, Socket> clientSockets;

您的服务器线程将接受新的客户端套接字,并将其放在上面的映射中。然后,将有另一个线程执行事件循环,并且只要InputStream中的任何客户端套接字中有数据,它都应将该数据发送到所有其他客户端套接字(群聊)。这应该在休眠间隔内无限发生。


0
投票

[通常,在生产级服务器代码中,我们不直接创建套接字和处理请求。使用低水平插座,紧密连接并防止泄漏是一场噩梦。相反,我们依赖于生产级框架,例如Java Spring FrameworkPlay Framework

我的问题是,为什么您不使用任何服务器端框架,例如上面列出的框架?

  1. 如果您想知道这些框架如何处理数千个并发请求,请研究诸如Thread Pool之类的设计模式。这些框架消除了复杂性并为您处理线程池。

  2. 如果不希望客户立即收到答复,您也可以考虑引入messaging queue such as Kafka。服务器将从队列中一个一个地挑选消息并进行处理。但是,请记住,这是异步的,可能无法满足您的要求。

  3. 如果您不仅限于一台服务器,您可以考虑将服务器代码部署到Azure或AWS VMSS(虚拟机规模集)。根据您配置的CPU负载规则,系统将自动为您扩展规模并动态管理资源。

我建议您阅读与服务器有关的系统设计原则,以加深您的理解。

不要重新发明轮子。

© www.soinside.com 2019 - 2024. All rights reserved.