所以我的问题在这里。现在,如果我的服务器有20个以上的客户端,则它也有20个线程,而带有ryzen CPU的桌面在30个线程时的使用率将达到100%。现在,我想通过一台服务器处理大量客户端,但是CPU只是被过度使用了。我的智慧很简单,但必须有更好的方法。因为到目前为止,我已经看到很多好的Java服务器。我不知道我做错了什么。在下面的文章中,我将分享我的代码,原则上我该如何做。
while(this.isRunning()) {
ServerSocket server = new ServerSocket(8081);
Socket s = server.accept();
new Thread(new WorkerRunnable(s)).start();
//now here if e.g. over 25 users connect there are 25 threads. CPU is at 100%. Is there a better way to handle this?
可运行的工人正在标识客户端。之后,他们将进入聊天室。就像是群聊,例如
编辑:我未完成的代码的相关部分仍然很清晰
private boolean state;
private ServerSocket socket;
@Override
public void run() {
while(this.isRunning()==true) {
try {
if(this.socket==null) this.socket = new ServerSocket(this.getPort());
Socket connection = this.socket.accept();
IntroductionSession session = new IntroductionSession(this, connection);
new Thread(session).start();
//register timeout task for 3 secs and handle it async
System.out.println(ManagementFactory.getThreadMXBean().getThreadCount());
//this.handleIncomingConnection(connection);
} catch(Exception e) {
e.printStackTrace();
//System.exit(1);
}
}
}
私有类IntroductionSession实现了Runnable {private boolean alive = true;
private BaseServer server;
private Socket socket;
private boolean introduced = false;
public IntroductionSession(BaseServer server, Socket socket) {
this.server = server;
this.socket = socket;
}
private void interrupt() {
System.out.println("Not mroe alive");
this.alive = false;
}
private void killConnection() {
this.killConnection("no_reason");
}
private void killConnection(String reason) {
try {
if(this.from_client!=null) this.from_client.close();
if(this.to_client!=null) this.to_client.close();
this.socket.close();
switch(reason) {
case "didnt_introduce":
System.out.println("Kicked connection, cause it didn't introduce itself");
break;
case "unknown_type":
System.out.println("Kicked unknown connection-type.");
break;
case "no_reason":
default:
//ignore
break;
}
} catch (IOException e) {
switch(reason) {
case "didnt_introduce":
System.out.println("Error at kicking connection, which didn't introduce itself");
break;
case "unknown_type":
System.out.println("Error at kicking unknown connection-type.");
break;
case "no_reason":
default:
System.out.println("Error occured at kicking connection");
break;
}
e.printStackTrace();
}
}
private ObjectInputStream from_client;
private ObjectOutputStream to_client;
@Override
public void run() {
while(this.alive==true) {
try {
if(this.to_client==null) {
this.to_client = new ObjectOutputStream(this.socket.getOutputStream());
//this.to_client.flush();
}
if(this.from_client==null) this.from_client = new ObjectInputStream(this.socket.getInputStream());
//Time runs now, if socket is inactive its getting kicked
new Timer().schedule(new java.util.TimerTask() {
@Override
public void run() {
if(IntroductionSession.this.introduced==false) {
IntroductionSession.this.killConnection("didnt_introduce");
Thread.currentThread().interrupt();
IntroductionSession.this.interrupt();
}
}
}, 5000
);
Object obj = this.from_client.readObject();
while(obj!=null) {
if(obj instanceof IntroductionPacket) {
IntroductionPacket pk = (IntroductionPacket) obj;
introduced = true;
if(isCompatible(pk)==false) {
try {
this.to_client.writeObject(new DifferentVersionKickPacket(BaseServer.version));
this.to_client.close();
this.from_client.close();
IntroductionSession.this.socket.close();
System.out.println("Kicked socket, which uses another version.");
} catch(Exception e) {
Thread.currentThread().interrupt();
//ignore
System.out.println("Error at kicking incompatible socket.");
e.printStackTrace();
}
} else {
this.server.handleIncomingConnection(this.socket, this.from_client, this.to_client);
}
Thread.currentThread().interrupt();
}
}
} catch(StreamCorruptedException e) {
//unknown client-type = kick
this.killConnection("unknown_type");
} catch (IOException|ClassNotFoundException e) {
e.printStackTrace();
this.killConnection("no_reason");
}/* catch(SocketException e) {
}*/
}
Thread.currentThread().interrupt();
}
}
扩展类,它是实际的服务器:
@Override
public void handleIncomingConnection(Socket connection, ObjectInputStream from_client, ObjectOutputStream to_client) {
new AuthenticationSession(connection, from_client, to_client).run();
}
private class AuthenticationSession implements Runnable {
private Socket socket;
private ObjectInputStream from_client;
private ObjectOutputStream to_client;
public AuthenticationSession(Socket socket, ObjectInputStream from_client, ObjectOutputStream to_client) {
this.socket = socket;
this.to_client = to_client;
this.from_client = from_client;
}
//TODO: Implement app id for access tokens
@Override
public void run() {
try {
while(this.socket.isConnected()==true) {
/*ObjectOutputStream to_client = new ObjectOutputStream(socket.getOutputStream()); //maybe cause problems, do it later if it does
ObjectInputStream from_client = new ObjectInputStream(socket.getInputStream());*/
Object object = from_client.readObject();
while(object!=null) {
if(object instanceof RegisterPacket) {
RegisterPacket regPacket = (RegisterPacket) object;
System.out.println("Username:" + regPacket + ", password: " + regPacket.password + ", APP-ID: " + regPacket.appId);
} else {
System.out.println("IP " + this.socket.getInetAddress().getHostAddress() + ":" + this.socket.getPort() + " tried to send an unknown packet.");
this.socket.close();
}
}
}
}/* catch(EOFException eofe) {
//unexpected disconnect
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}*/
catch(Exception e) {
e.printStackTrace();
System.exit(1);
}
/*catch(Exception e) {
//e.printStackTrace();
Thread.currentThread().interrupt();
}*/
}
}
[请不要看看它非常糟糕的格式和我为修复它所做的工作,尽管如此,这些任务也不会消失。
由于您正在执行聊天应用程序,因此您需要考虑进行单线程事件循环。
您可以保留字符串(客户端ID)和套接字(客户端套接字)的映射。
Map<String, Socket> clientSockets;
您的服务器线程将接受新的客户端套接字,并将其放在上面的映射中。然后,将有另一个线程执行事件循环,并且只要InputStream
中的任何客户端套接字中有数据,它都应将该数据发送到所有其他客户端套接字(群聊)。这应该在休眠间隔内无限发生。
[通常,在生产级服务器代码中,我们不直接创建套接字和处理请求。使用低水平插座,紧密连接并防止泄漏是一场噩梦。相反,我们依赖于生产级框架,例如Java Spring Framework或Play Framework。
我的问题是,为什么您不使用任何服务器端框架,例如上面列出的框架?
如果您想知道这些框架如何处理数千个并发请求,请研究诸如Thread Pool之类的设计模式。这些框架消除了复杂性并为您处理线程池。
如果不希望客户立即收到答复,您也可以考虑引入messaging queue such as Kafka。服务器将从队列中一个一个地挑选消息并进行处理。但是,请记住,这是异步的,可能无法满足您的要求。
如果您不仅限于一台服务器,您可以考虑将服务器代码部署到Azure或AWS VMSS(虚拟机规模集)。根据您配置的CPU负载规则,系统将自动为您扩展规模并动态管理资源。
我建议您阅读与服务器有关的系统设计原则,以加深您的理解。
不要重新发明轮子。