火花流在添加JsonArray时抛出java.util.ConcurrentModificationException

问题描述 投票:0回答:1

在以下过程中出现错误。我知道似乎引发了此错误,因为它试图读取分区(rec)中的整个记录​​,但同时尝试将其分配给字符串(Str = jsonArray.toJSONString();)在Spark Streaming配置中使用5秒的批处理间隔。这个代码有什么建议吗?请帮助。谢谢

此行中有错误:

 Str=jsonArray.toJSONString();

下面是我的全部功能:

MapRowRDD.foreachRDD(rdd ->{
            rdd.foreachPartition(
                    rec-> {
                        while(rec.hasNext()) {
                            JSONObject record = rec.next();
                            i=i+1;
                          if(TimeUnit.MINUTES.convert(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                                  .parse((String) record.get("DATE_TRANSACTION"))
                                  .getTime()-DateUtils.addMinutes(new Date(), -5)
                                  .getTime(),TimeUnit.MILLISECONDS)>=0 || Integer.valueOf((String) record.get("EVENT_TYPE"))<0) {
                              jsonArray.add(record);
                            if(i % v_BATCH_WINDOW == 0)
                            {   
                                try {
                                    Str=jsonArray.toJSONString();
                                    HttpResponse<String> Response = ui.post(v_REST_API_ENDPOINT).body(Str).asString();
                                    out_JSON=Response.getBody();
                                    log.warn("Response : " + out_JSON.toString());
                                }
                                catch(UnirestConfigException e){
                                    System.out.println("UnirestConfigException occured "+ e.toString());
                                    e.printStackTrace();
                                }
                                jsonArray.clear();
                                i=0;
                            }
                          }
                        publishToKafka(record.toString(), outputTopic, props);
                        }
                        Str=jsonArray.toJSONString();
                        if (!Str.equals("[]") && Str!=null && !Str.isEmpty()) {
                            HttpResponse<String> Response = ui.post(v_REST_API_ENDPOINT).body(Str).asString();
                        }
                        jsonArray.clear();
                        i=0;
                    }   
                    );
        });
java arrays exception spark-streaming concurrentmodification
1个回答
0
投票

如您所知,当您通过不同的线程同时修改和迭代同一集合时,会发生此异常。 jsonArray不是线程安全的,请用Vector等一些线程安全的集合替换它,然后查看此方法的效果

© www.soinside.com 2019 - 2024. All rights reserved.