将大数据从外部数据库流式传输到 Ignite 服务器需要太长时间

问题描述 投票:0回答:1

我的需求是我想从外部数据库(SQL)加载1000万条数据到ignite服务器中。我正在使用 ignite 的缓存功能,它将这 1000 万条记录存储到我的 ignite 服务器中。我已经使用批处理和分页使用 ignite 的DataStreamer将数据流式传输到服务器中。即使在发送第一批数据之前,我也会收到“可能太长的 JVM 暂停”,即使所有 10处理 100 万条记录后,将数据加载到 ignite 服务器大约需要 40 分钟。我究竟做错了什么? 除了增加堆之外还需要进行任何修改吗? 有没有办法将大量数据从外部数据库流式传输到缓存中?

我的客户端和服务器端 JVM_OPTS :

-Xms4g
-Xmx4g
-XX:+AlwaysPreTouch
-XX:+UseG1GC
-XX:+ScavengeBeforeFullGC
-XX:ParallelGCThreads=8
-XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=55

以下所有代码都在我的客户端上。

用于传输批量数据的客户端代码:

     @Service
        public class PosWavierService {
        private static final Logger logger = LoggerFactory.getLogger(PosWavierService.class);
        private static final int BATCH_SIZE = 100_000; // Adjust batch size as needed
        private static final int NUM_THREADS = 6;
    
        private final ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
    
        @Autowired
        private IgniteCacheService igniteCacheService;
    
        @Autowired
        private Ignite ignite;
    
        @Autowired
        private ProductLinesRepo productLinesRepo;
    
    
        public CompletableFuture<Void> processAllRecords(String cacheName) {
            long startTime = System.currentTimeMillis();
    
            // List to store CompletableFuture for each thread
            List<CompletableFuture<Void>> futures = new ArrayList<>();
    
            // Submit tasks for fetching and streaming data concurrently
            AtomicInteger pageNumber = new AtomicInteger(0);
            for (int i = 0; i < NUM_THREADS; i++) {
                CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
                    while (true) {
                        List<ProductLines> records = fetchDataFromRepo(pageNumber.getAndIncrement(), BATCH_SIZE);
                        if (records.isEmpty()) {
                            break;
                        }
                        igniteCacheService.streamBulkData(cacheName, records);
                        logger.info("Processed {} records for cache {}", records.size(), cacheName);
                    }
                    return null;
                });             futures.add(future);
            }
    
            CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
                    futures.toArray(new CompletableFuture[0]));
    
            combinedFuture.thenRun(() -> {
                long endTime = System.currentTimeMillis();
                long totalTime = endTime - startTime;
                logger.info("Total time taken for processing all records: {} milliseconds", totalTime);
            });
    
            return combinedFuture;
        }}

从表中检索数据的存储库:

    @Query(value = "SELECT * FROM productlines WHERE pan_no IS NOT NULL AND pan_no != '' AND pan_no != ' ' ", nativeQuery = true)
    Page<ProductLines> findRecordsWithPanNotNull(Pageable pageable);

streamBulkData 函数:

     public void streamBulkData(String cacheName, List<ProductLines> records) {
        try (IgniteDataStreamer<String, ProductLines> streamer = ignite.dataStreamer(cacheName)){
           //  FileWriter writer = new FileWriter(KEYS_FILE_PATH, true)) { // Append mode
    
            streamer.allowOverwrite(true);
            streamer.perNodeBufferSize(1024);
            streamer.perNodeParallelOperations(8);
            streamer.skipStore(true);
    
            for (ProductLines record : records) {
                String key = record.getPan_no();
                if (key != null) {
                    streamer.addData(key, record);
                    //writer.write(key);
                } else {
                    System.err.println("Skipping record with null key: " + record);
                }
            }
            streamer.flush();
    
        } catch (CacheException e) {
            System.err.println("Error streaming data to cache: " + e.getMessage());
            e.printStackTrace();
        }
    }

java caching ignite apacheignite
1个回答
0
投票

我相信您正在为每次调用streamBulkData 实例化一个流媒体。我建议创建您的流实例并将对其的引用传递到您的streamBulkData 方法中。这样做可以保持与将数据流式传输到的主机的连接。

您也没有提供任何有关您正在直播的内容的信息! 集群大小(节点数)、堆外内存配置、堆内存大小、网络功能、客户端加载程序和服务器之间的延迟、持久性配置……

请注意,多个 ignite 主机的摄取速度可以比 1 台更快,但我不知道您要流式传输到什么!

希望有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.