我的需求是我想从外部数据库(SQL)加载1000万条数据到ignite服务器中。我正在使用 ignite 的缓存功能,它将这 1000 万条记录存储到我的 ignite 服务器中。我已经使用批处理和分页使用 ignite 的DataStreamer将数据流式传输到服务器中。即使在发送第一批数据之前,我也会收到“可能太长的 JVM 暂停”,即使所有 10处理 100 万条记录后,将数据加载到 ignite 服务器大约需要 40 分钟。我究竟做错了什么? 除了增加堆之外还需要进行任何修改吗? 有没有办法将大量数据从外部数据库流式传输到缓存中?
我的客户端和服务器端 JVM_OPTS :
-Xms4g
-Xmx4g
-XX:+AlwaysPreTouch
-XX:+UseG1GC
-XX:+ScavengeBeforeFullGC
-XX:ParallelGCThreads=8
-XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=55
以下所有代码都在我的客户端上。
用于传输批量数据的客户端代码:
@Service
public class PosWavierService {
private static final Logger logger = LoggerFactory.getLogger(PosWavierService.class);
private static final int BATCH_SIZE = 100_000; // Adjust batch size as needed
private static final int NUM_THREADS = 6;
private final ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
@Autowired
private IgniteCacheService igniteCacheService;
@Autowired
private Ignite ignite;
@Autowired
private ProductLinesRepo productLinesRepo;
public CompletableFuture<Void> processAllRecords(String cacheName) {
long startTime = System.currentTimeMillis();
// List to store CompletableFuture for each thread
List<CompletableFuture<Void>> futures = new ArrayList<>();
// Submit tasks for fetching and streaming data concurrently
AtomicInteger pageNumber = new AtomicInteger(0);
for (int i = 0; i < NUM_THREADS; i++) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
while (true) {
List<ProductLines> records = fetchDataFromRepo(pageNumber.getAndIncrement(), BATCH_SIZE);
if (records.isEmpty()) {
break;
}
igniteCacheService.streamBulkData(cacheName, records);
logger.info("Processed {} records for cache {}", records.size(), cacheName);
}
return null;
}); futures.add(future);
}
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]));
combinedFuture.thenRun(() -> {
long endTime = System.currentTimeMillis();
long totalTime = endTime - startTime;
logger.info("Total time taken for processing all records: {} milliseconds", totalTime);
});
return combinedFuture;
}}
从表中检索数据的存储库:
@Query(value = "SELECT * FROM productlines WHERE pan_no IS NOT NULL AND pan_no != '' AND pan_no != ' ' ", nativeQuery = true)
Page<ProductLines> findRecordsWithPanNotNull(Pageable pageable);
streamBulkData 函数:
public void streamBulkData(String cacheName, List<ProductLines> records) {
try (IgniteDataStreamer<String, ProductLines> streamer = ignite.dataStreamer(cacheName)){
// FileWriter writer = new FileWriter(KEYS_FILE_PATH, true)) { // Append mode
streamer.allowOverwrite(true);
streamer.perNodeBufferSize(1024);
streamer.perNodeParallelOperations(8);
streamer.skipStore(true);
for (ProductLines record : records) {
String key = record.getPan_no();
if (key != null) {
streamer.addData(key, record);
//writer.write(key);
} else {
System.err.println("Skipping record with null key: " + record);
}
}
streamer.flush();
} catch (CacheException e) {
System.err.println("Error streaming data to cache: " + e.getMessage());
e.printStackTrace();
}
}
我相信您正在为每次调用streamBulkData 实例化一个流媒体。我建议创建您的流实例并将对其的引用传递到您的streamBulkData 方法中。这样做可以保持与将数据流式传输到的主机的连接。
您也没有提供任何有关您正在直播的内容的信息! 集群大小(节点数)、堆外内存配置、堆内存大小、网络功能、客户端加载程序和服务器之间的延迟、持久性配置……
请注意,多个 ignite 主机的摄取速度可以比 1 台更快,但我不知道您要流式传输到什么!
希望有帮助。