我正在编写一个应用程序,该应用程序形成大约 4000 个连接并发送数据。但不知何故它消耗的内存超过 700 GB。理想的优化方式是什么?有什么垃圾收集器建议可以提供帮助吗?即使在每条消息发送时强制 GC 也没有帮助。
目标是大规模使用 Web 套接字测试持久长连接。
import java.io.IOException;
import java.util.HashMap;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.websocket.Session;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.adapter.NativeWebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import n1wsperf.server.ServerTelemetry.ServerMetricType;
@Component
public class TextSocketHandler extends TextWebSocketHandler {
private volatile static HashMap<String, WebSocketSession> activeSessions = new HashMap<>() ;
private static final int maxSizeInKB = 61440; // 60MB or 480Mb
private static final TextMessage _data = DataService.getData();
private static String modeStr = "Request-Response";
public static void Initialize() {
Timer timer = new Timer("ActiveSessionTimer");
TimerTask task = new TimerTask() {
@Override
public void run() {
ServerTelemetry.TrackMetric(ServerMetricType.ACTIVESESSION, activeSessions.size());
System.out.println("Active Connections : " + activeSessions.size());
}
};
CompletableFuture.runAsync( () -> { timer.schedule(task, 1000, 60000L); });
}
public TextSocketHandler() {
modeStr = ServerConfig.getServerRunConfig().DuplexMode ? "Duplex" : "Request-Response";
}
@Async
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
try {
session.setTextMessageSizeLimit(maxSizeInKB * 1024); // 60MB or 480Mb
String sessionId = session.getId();
if (session instanceof NativeWebSocketSession) {
final Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class);
if (nativeSession != null ) {
nativeSession.getUserProperties()
.put("org.apache.tomcat.websocket.BLOCKING_SEND_TIMEOUT", 60_000L);
}
}
session.sendMessage(new TextMessage(sessionId));
activeSessions.putIfAbsent(session.getId(), session);
ServerTelemetry.TrackMetric(ServerMetricType.NEWSESSION);
TraceLogger.Info("SocketHandler", String.format("Session(new): %s: Mode: %s: New Connection established with client %s", session.getId(), modeStr,session.getRemoteAddress()), true);
Run run = ServerConfig.getServerRunConfig();
if (run.DuplexMode){
startDuplexCommunicationAsync(sessionId, run); // non-blocking
}
TraceLogger.Info("SocketHandler", String.format("Session (Verbose: %s): %s: Mode: %s Communication is in progress with client %s", run.Verbose, session.getId(), modeStr,session.getRemoteAddress()), true);
} catch (Exception e) {
ServerTelemetry.TrackMetric(ServerMetricType.FAILEDSESSION);
ServerTelemetry.TrackException(e);
TraceLogger.Error("SocketHandler", String.format("Session: %s Mode: %s Exception: %s", session.getId(), modeStr,e.getMessage()));
} finally {
System.gc();
}
}
@Async
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
try{
var sessionId = session.getId();
var clientAdr = session.getRemoteAddress();
ServerTelemetry.TrackMetric(ServerMetricType.CLOSEDSESSION);
TraceLogger.Info("SocketHandler", String.format("Session: %s Mode: %s : Closed Connection with client %s", sessionId, modeStr,clientAdr), true);
if (activeSessions.keySet().contains(sessionId)){
activeSessions.remove(sessionId);
ServerTelemetry.TrackMetric(ServerMetricType.ACTIVESESSION, activeSessions.size());
}}
finally {
System.gc();
}
}
@Async
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
try {
String sessionId = session.getId();
WebSocketSession wsSession = activeSessions.get(sessionId);
long start = System.nanoTime();
//ServerTelemetry.TrackMetric(ServerMetricType.REQUEST);
String payload = message.getPayload();
//ServerTelemetry.TrackMetric(ServerMetricType.REQUESTSIZE, payload.length()); // Bytes
TraceLogger.Info("SocketHandler", String.format("Session: %s: Mode: %s: Received %s bytes from client %s", sessionId, modeStr,payload.length(), wsSession.getRemoteAddress()));
if (!ServerConfig.getServerRunConfig().DuplexMode) {
CompletableFuture.runAsync( () -> {
try {
sendMessage(wsSession, start);
} catch (IOException | InterruptedException e) {
TraceLogger.Error("SocketHandler", String.format("Session: %s Mode: %s Exception: %s", sessionId, modeStr,e.getMessage()));
}
}).get();
}
long end = System.nanoTime();
double reqDurationms = TimeUnit.NANOSECONDS.toMillis(end - start);
// ServerTelemetry.TrackMetric(ServerMetricType.ACTIVESESSION, activeSessions.size());
// ServerTelemetry.TrackMetric(ServerMetricType.DURATION, reqDurationms);
} catch (Exception e) {
ServerTelemetry.TrackMetric(ServerMetricType.FAILEDREQUEST);
ServerTelemetry.TrackException(e);
TraceLogger.Error("SocketHandler", String.format("Session: %s Mode: %s Exception: %s", session.getId(), modeStr,e.getMessage()));
} finally {
System.gc();
}
}
@Async
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
try{
WebSocketSession wsSession = activeSessions.get(session.getId());
ServerTelemetry.TrackMetric(ServerMetricType.TRANSPORTERROR);
TraceLogger.Error("SocketHandler", String.format("Session: %s: Mode: %s Error: %s", wsSession.getId(), modeStr,exception.getMessage()));
if (activeSessions.keySet().contains(wsSession.getId())){
activeSessions.remove(wsSession.getId());
ServerTelemetry.TrackMetric(ServerMetricType.ACTIVESESSION, activeSessions.size());
}}
finally {
System.gc();
}
}
private void startDuplexCommunicationAsync(String sessionId, Run run) throws IOException, InterruptedException {
CompletableFuture.runAsync(() -> {
long sessionStart = System.nanoTime();
WebSocketSession wsSession = null;
for (long current = System.nanoTime(); current < sessionStart + TimeUnit.SECONDS.toNanos(run.KeepAlive_Sec); ){
try {
wsSession = activeSessions.get(sessionId);
sendMessage(wsSession, System.nanoTime());
current = System.nanoTime();
} catch (IOException | InterruptedException e) {
TraceLogger.Error("SocketHandler", String.format("Session: %s Mode: %s Exception: %s", sessionId, modeStr,e.getMessage()));
}
}
if (wsSession != null && wsSession.isOpen()){
try {
wsSession.close(CloseStatus.NORMAL);
TraceLogger.Info("SocketHandler", String.format("Session: %s Mode: %s: Closing session as the configured keepalive duration (%s seconds) has elapsed, sent session close to client %s", sessionId, modeStr,run.KeepAlive_Sec, wsSession.getRemoteAddress()), true);
} catch (IOException e) {
TraceLogger.Error("SocketHandler", String.format("Session: %s Mode: %s Exception: %s", sessionId, modeStr,e.getMessage()));
}
}
});
}
private void sendMessage(WebSocketSession session, long start) throws IOException, InterruptedException {
if (!session.isOpen()) {
throw new IOException("Session is not in open state");
}
var runCfg = ServerConfig.getServerRunConfig();
try {
session.sendMessage(_data);
//ServerTelemetry.TrackMetric(ServerMetricType.RESPONSESIZE, _data.getPayloadLength()); // Bytes
TraceLogger.Info("SocketHandler", String.format("Session: %s: Mode: %s: %s bytes sent to client %s", session.getId(), modeStr, _data.getPayloadLength(), session.getRemoteAddress()));
long end = System.nanoTime();
long remainMillis = runCfg.RequestInterval_ms - TimeUnit.NANOSECONDS.toMillis(end - start);
if (remainMillis > 0) {
Thread.sleep(remainMillis);
}
} catch (IOException e) {
TraceLogger.Error("SocketHandler", String.format("Session: %s: Mode: %s: Exception: %s", session.getId(), modeStr,e.getMessage()));
} finally {
System.gc();
}
}
}
通过对 TextSocketHandler 代码的快速概述,我可以突出显示可能有助于优化内存使用和避免潜在内存泄漏的几点:
用于活动会话的 HashMap:如果会话在关闭时未正确删除,则使用 activeSession 来存储 WebSocket 会话可能会导致内存问题。确保会话在关闭时从 activeSessions 映射中删除,以防止内存泄漏。
异步操作和错误处理:在 WebSocket 方法(afterConnectionEstablished、afterConnectionClosed、handleTextMessage、handleTransportError)中谨慎使用 CompletableFuture.runAsync()。如果不进行有效管理,这些异步操作可能会引入争用和潜在的内存开销。
垃圾收集:在 WebSocket 方法中显式调用 System.gc() 可能没有必要,而且可能会适得其反,因为 Java 的垃圾收集器旨在自动管理内存。
资源处理:始终确保 WebSocket 会话等资源在不再需要时正确关闭并清理。这包括关闭会话、正确处理异常以及释放与会话关联的任何资源。
数据处理和缓冲:考虑优化传入和传出消息(handleTextMessage、sendMessage)的处理,以最大限度地减少内存使用。评估内存中是否不必要地保留了大量有效负载或数据结构。
优化内存使用并避免潜在问题:
在负载下分析应用程序以识别内存热点和导致高内存消耗的区域非常重要。 Java Flight Recorder、VisualVM 或分析器等工具可以帮助识别 WebSocket 代码中的内存泄漏或低效内存使用模式。