Hazelcast Jet 多数据源连接减慢机器和高内存使用率

问题描述 投票:0回答:0

我创建了一个 Spring Boot 应用程序,我们在其中使用 Hazelcast Jet 加入多个数据源。对于小型数据源,它工作得很好,但随着数据源记录增加到 100 万以上,它开始占用过多内存和内存泄漏。我不确定哪里出了问题。如何提高加入速度和降低内存使用率?我已经为我的应用程序分配了 8gb ram 内存。

下面是调用Pipeline的代码:

UUID idOne = UUID.randomUUID();
DataFactory av = new DataFactory(idOne.toString());
av.buildJetInstance();
Map<String, Object> p = new HashMap<String, Object>();
p.putAll((Map<String, Object>) data.get("alldata"));
av.readMaptoJsonFile(p);
Pipeline pl = av.createPipeline();
av.runProcess(pl);

public class DataFactory implements Serializable {

    public String uid;
    public DataFactory(String uid) {
        this.uid = uid;
    }
    
    public JetInstance buildJetInstance() {
        

        JetConfig jetConfig = new JetConfig();

        jetConfig.getInstanceConfig().setCooperativeThreadCount(5);
        jetConfig.configureHazelcast(c -> {
            c.getNetworkConfig().setReuseAddress(true);
            c.setClusterName("DATA" + UUID.randomUUID().toString());
            c.getNetworkConfig().getJoin().getTcpIpConfig().setEnabled(true).setMembers(Arrays.asList(new String[] {"localhost"}));
            
        });
        EtlObjects.jetInstance = Jet.newJetInstance(jetConfig);
        return EtlObjects.jetInstance;
    }
    
    public Pipeline createPipeline() {
        return Pipeline.create();
    }
    
    public void joinPipeToJet(Pipeline pl, String name) {
        JobConfig j = new JobConfig();
        j.setName(name);
        EtlObjects.jetInstance.newJob(pl,j).join();
    }
    
    public void readMaptoJsonFile(final Map<String, Object> data) {
        String jobid = UUID.randomUUID().toString();
        try {
            Pipeline pl = createPipeline();
            UUID idOne = UUID.randomUUID();
            
             final IMap<Object, Object> abc = EtlObjects.jetInstance.getMap(idOne.toString());
             abc.putAll(data);
            
             final BatchSource batchSource = Sources.map(abc);
             
                    pl.readFrom(batchSource)
                        .writeTo(Sinks.map(this.uid));
                    
            joinPipeToJet(pl, jobid);
             abc.destroy();
        } catch (Exception e) {
            Job j1 = EtlObjects.jetInstance.getJob(jobid);
            if (j1 != null) {
                j1.cancel();
            }
        } finally {
            Job j1 = EtlObjects.jetInstance.getJob(jobid);
            if (j1 != null) {
                j1.cancel();
            }
        }
    }
    
    public Map<String, Object> runProcess(final Pipeline pl) {
        
        //Where all source batch will be stored
        final Map<String, BatchStage<Object>> allBatch = new HashMap<String, BatchStage<Object>>();
    
        final Map<String, Object> data = // will get data from Pipeline 
        
        
        //Here we will get data from different datasources (jdbc, csv, text etc.) and it will be store in allBatch
        ((List<Map<String, Object>>)data.get("sources")).stream().forEach(z -> {
        
                //jdbcSource where we get data from database
            allBatch.put(z.get("id").toString(), pl.readFrom(jdbcSource));
        
        });
        
        //Here we will right the logic for the joining multiple datasources
        ((List<Map<String, Object>>)data.get("joins")).stream().forEach(z -> {
                allBatch.put("result",new JoinData().join(data, allBatch.get(z.get("id1").toString()), allBatch.get(z.get("id2").toString()),pl,joinKeys));
        });
        
        
        
    }
    }

下面是我连接到数据源的内部连接:

    public BatchStage<Object> JoinData() {
    
            //Here is the logic for the inner joining
    
             BatchStageWithKey<Object, String> jdbcGroupByKey = batch1.filter(k -> ((Map<String, Object>)k).get(col1) != null).groupingKey(jdbcData ->  {
                    // gorup by join key
            });
            BatchStageWithKey<Object, String> csvGroupByKey = batch2.filter(k -> ((Map<String, Object>)k).get(col1) != null).groupingKey(jdbcData ->  {
                    // gorup by join key
            });
            
            //Aggregate here
            BatchStage<Entry<String, Tuple2<List<Object>, List<Object>>>> d = jdbcGroupByKey.aggregate2(AggregateOperations.toList(),csvGroupByKey,AggregateOperations.toList());
            
            BatchStage<List<Object>> jdbcBatchStageData = d.map(e -> { 
                    // joining
            });
            
            return jdbcBatchStageData;
            
    }
    

有时同时加入多个管道会出现以下错误:

    2023-03-24 09:39:15,092 [ INFO] - processors=4, physical.memory.total=15.4G, physical.memory.free=7.3G, swap.space.total=0, swap.space.free=0, heap.memory.used=2.8G, 
heap.memory.free=1.1G, heap.memory.total=3.9G, heap.memory.max=7.1G, 
heap.memory.used/total=70.82%, heap.memory.used/max=38.70%, minor.gc.count=228, 
minor.gc.time=14798ms, major.gc.count=5, major.gc.time=2766ms, load.process=0.00%, 
load.system=0.00%, load.systemAverage=2.28, thread.count=84, thread.peakCount=119, 
cluster.timeDiff=0, event.q.size=0, executor.q.async.size=0, executor.q.client.size=0, 
executor.q.client.query.size=0, executor.q.client.blocking.size=0, executor.q.query.size=0, 
executor.q.scheduled.size=0, executor.q.io.size=0, executor.q.system.size=0, 
executor.q.operations.size=0, executor.q.priorityOperation.size=0, 
operations.completed.count=5818629, executor.q.mapLoad.size=0, executor.q.mapLoadAllKeys.size=0, 
executor.q.cluster.size=0, executor.q.response.size=0, operations.running.count=0, 
operations.pending.invocations.percentage=0.00%, operations.pending.invocations.count=0, 
proxy.count=10, clientEndpoint.count=0, connection.active.count=0, client.connection.count=0,
 connection.count=0

    

我怎样才能提高它的性能?

java spring-boot hazelcast hazelcast-jet
© www.soinside.com 2019 - 2024. All rights reserved.