Ignite 需要更多堆内存

问题描述 投票:0回答:1

我目前正在致力于将 Apache Ignite 构建为缓存层。我的要求是启动时加载1000万条数据到服务器。缓存 400,000 条记录后,我遇到“GC 开销超出”错误。我已经检查了内存泄漏,并且我的代码看起来很好。这个问题可能与我的系统 RAM (8GB) 有关吗?

我尝试通过设置这些 JAVA_OPTS 来增加堆上初始大小。

-Xms512m -Xmx4g -Xmn2048m -XX:+UseParallelGC

设置完成后,我最多可以处理 800,000 条记录,但在那之后,我的 IDE 立即崩溃了。我不得不重新启动系统。

服务器配置:

 IgniteConfiguration cfg = new IgniteConfiguration();
        cfg.setIgniteInstanceName("Instance");
        cfg.setConsistentId("Node");

        // Create TCP Communication SPI
        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();

        // Set the socketWriteTimeout to 5 seconds (5000 milliseconds)
        commSpi.setSocketWriteTimeout(5000);

        // Data storage configuration

        DataStorageConfiguration storageCfg = new DataStorageConfiguration();
        DataRegionConfiguration regionCfg = new DataRegionConfiguration();
        regionCfg.setName("500MB_Region");

        regionCfg.setPersistenceEnabled(true);
        regionCfg.setInitialSize(1024L * 1024 * 1024); // 1GB initial size
        regionCfg.setMaxSize(6L * 1024 * 1024 * 1024); // 6GB maximum size

        regionCfg.setMetricsEnabled(true); // Enable metrics for monitoring
        // Data region configuration
        regionCfg.setPageEvictionMode(DataPageEvictionMode.RANDOM_LRU);
        regionCfg.setPageReplacementMode(PageReplacementMode.RANDOM_LRU);
        storageCfg.setDefaultDataRegionConfiguration(regionCfg);

        CacheConfiguration<String, String> marksCacheCfg = new CacheConfiguration<>();
        marksCacheCfg.setName("poswavierCache");
        marksCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
        marksCacheCfg.setCacheMode(CacheMode.REPLICATED);
        marksCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC);

        cfg.setCacheConfiguration(marksCacheCfg);
        cfg.setPeerClassLoadingEnabled(true);
        cfg.setDataStorageConfiguration(storageCfg);


        Ignite igniteServer = Ignition.start(cfg);
        igniteServer.cluster().state(ClusterState.ACTIVE);

        IgniteCache<String, String> marksCache = igniteServer.getOrCreateCache("poswavierCache");
        igniteServer.resetLostPartitions(Arrays.asList("poswavierCache"));
        igniteServer.cluster().baselineAutoAdjustEnabled(true);

这就是我使用 datastreamer 将数据推送到服务器的方式:

 private static final int BATCH_SIZE = 100000;

    @Autowired
    private IgniteCacheService igniteCacheService;

    @Autowired
    private ProductLinesRepo productLinesRepo;

    public CompletableFuture<Void> processAllRecords() {
        long startTime = System.currentTimeMillis();

        int pageNumber = 0;
        Page<ProductLines> page;
        CompletableFuture<Void> future = CompletableFuture.completedFuture(null);

        do {
            page = productLinesRepo.findRecordsWithPanNotNull(PageRequest.of(pageNumber++, BATCH_SIZE));
            List<ProductLines> records = page.getContent();
            if (!records.isEmpty()) {
                int finalPageNumber = pageNumber;
                future = future.thenCompose(result ->
                        CompletableFuture.runAsync(() -> {
                            igniteCacheService.streamBulkData("poswavierCache", records);
                            logger.info("Processed {} records", (finalPageNumber - 1) * BATCH_SIZE + records.size());
                        }));

            }
        } while (page.hasNext());

        long endTime = System.currentTimeMillis();
        long totalTime = endTime - startTime;
        logger.info("Total time taken for processing all records: {} milliseconds", totalTime);

        return future;
    }

数据流:

public void streamBulkData(String cacheName, List<ProductLines> records) {
        try (IgniteDataStreamer<String, ProductLines> streamer = ignite.dataStreamer(cacheName)) {
            streamer.allowOverwrite(true);

            for (ProductLines record : records) {
                String key = record.getPan_no();
                if (key != null) {
                    streamer.addData(key, record);
                } else {
                    System.err.println("Skipping record with null key: " + record);
                }
            }

            streamer.flush();

        } catch (CacheException e) {
            System.err.println("Error streaming data to cache: " + e.getMessage());
            e.printStackTrace();
        }
    }
java caching ignite
1个回答
0
投票

这里没有一件明显的事情表明问题所在,但有一些事情毫无价值。

首先,从您所写的内容来看,您似乎有一个具有 8Gb 内存的节点。对于分布式内存数据库来说,这并不是很多。

其次,您似乎过度使用了您的机器。由于 Ignite 位于内存中,因此永远不应该允许它交换到磁盘。但是,您已经配置了 4Gb 的堆空间和 6Gb 的堆外空间。即使在考虑操作系统、Java 和任何其他开销之前,这也已经超过了您拥有的 8Gb 内存。

这不会影响您的内存使用,但您已经配置了逐出和持久性。选择一个——可能是坚持。

我不确定我是否了解您的数据流媒体在哪里(我假设它是客户端),但它将使用大量内存。您确实想要流式传输数据,而不是将其全部复制到内存中并大批量发布。

最后,您已将 Java 配置为使用并行垃圾收集器,该收集器非常旧,不太适合像 Ignite 这样的大内存、多核系统。一般建议使用G1。更多信息请参见

文档

© www.soinside.com 2019 - 2024. All rights reserved.