Java的内存泄漏使用的LinkedBlockingQueue

问题描述 投票:2回答:1

亲爱,

运行一段时间后,我的Java应用程序抛出一个异常

异常的线程“主题-6” java.lang.OutOfMemoryError:GC开销超过限制

监视JVM内存使用的JProfiler后,我发现,在我的代码中的内存泄漏。

在我的程序给我寄了很多SNMP请求某些设备,并得到他们的响应,并将其推成两个的LinkedBlockingQueue对象。 有一些线程队列,从民意调查,并进行处理,并从队列中删除的对象。但似乎对象不从队列中删除。我看不到我的代码任何问题。任何意见和建议,将不胜感激

ResponseListener.java
public class ResponseListener{
    public void receiveResponse(){
                SNMPResultMessage resultMessage;
                    resultMessage = new SNMPResultMessage(
                            userObject.getDeviceId(),
                            oid,
                            deviceServiceId,
                            sensorId,
                            response.getVariable(oid).toInt(),
                            userObject.getTimestamp(),
                            Utils.getCreateDay(dt),
                            Utils.getQuarterOfDay(dt),
                            slaDisabled
                    );
                    Logger.debug("SNMP Request Response:" + resultMessage);                    

                PubSub.publish(resultMessage);

     }
}

pub sub.Java

公共类PubSub的{

public static final BlockingQueue<ResultMessage> average = new LinkedBlockingQueue<>();
public static final BlockingQueue<ResultMessage> cassandra = new LinkedBlockingQueue<>();
public static void publish(ResultMessage message) {
    average.add(message);
    cassandra.add(message);
}



private PubSub() {
    /* prevent instantiation */
}

}

average watcher.Java

public class AverageWatcher implements Runnable{
    @Override
    public void run() {
        try {
            while (true) {
                ResultMessage message = PubSub.average.take();
                if (message == null) {
                    if (numberOfChanges > 1000) {
                        insertBatch();
                    }
                    continue;
                }
                numberOfChanges++;
                if (changes.get(message.getSensorId()) == null) {
                    changes.put(message.getSensorId(), new HashSet<>());
                }
                changes.get(message.getSensorId()).add(new CreateDayQuarterOfDay(message.getCreateDay(), message.getQuarterOfDay()));
                if (numberOfChanges > 5000) {
                    insertBatch();
                }
            }
        } catch (Exception ex) {
            Logger.error(ex.getCause());
        }
    }
}

Cassandra queue watcher.Java

public class CassandraQueueWatcher implements Runnable{
    @Override
    public void run() {
        try {
            while (true) {
                ResultMessage message = PubSub.cassandra.take();
                if (message == null) {
                    if (batchSize > 1000) {
                        insertBatch();
                    }
                    continue;
                }

                boolean violated = checkViolated(message);

                batch.add(
                        Cassandra.insertRawReportStmt
                                .bind()
                                .setInt("sensor_id", message.getSensorId())
                                .setInt("create_day", message.getCreateDay())
                                .setLong("create_time", message.getTimestamp() / 1000)
                                .setFloat("value", message.getValue())
                                .setBool("violated",violated)
                );
                batchSize++;
                if (batchSize > 2000) {
                    insertBatch();
                }
                message =  null;
            }
        } catch (Exception ex) {
            Logger.error(ex.getCause());
        }
    }

}

在我的代码,只有在ResponseListener.java创建新SNMPResultMessage对象。请看看添附图像。 jprofiler

java multithreading memory-leaks
1个回答
0
投票

我猜想,队列填满,因为你是不是读他们更快地插入新的消息。

请检查队列的大小或使用bounded version

© www.soinside.com 2019 - 2024. All rights reserved.