在以下过程中出现错误。我知道似乎引发了此错误,因为它试图读取分区(rec)中的整个记录,但同时尝试将其分配给字符串(Str = jsonArray.toJSONString();)在Spark Streaming配置中使用5秒的批处理间隔。这个代码有什么建议吗?请帮助。谢谢
此行中有错误:
Str=jsonArray.toJSONString();
下面是我的全部功能:
MapRowRDD.foreachRDD(rdd ->{
rdd.foreachPartition(
rec-> {
while(rec.hasNext()) {
JSONObject record = rec.next();
i=i+1;
if(TimeUnit.MINUTES.convert(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.parse((String) record.get("DATE_TRANSACTION"))
.getTime()-DateUtils.addMinutes(new Date(), -5)
.getTime(),TimeUnit.MILLISECONDS)>=0 || Integer.valueOf((String) record.get("EVENT_TYPE"))<0) {
jsonArray.add(record);
if(i % v_BATCH_WINDOW == 0)
{
try {
Str=jsonArray.toJSONString();
HttpResponse<String> Response = ui.post(v_REST_API_ENDPOINT).body(Str).asString();
out_JSON=Response.getBody();
log.warn("Response : " + out_JSON.toString());
}
catch(UnirestConfigException e){
System.out.println("UnirestConfigException occured "+ e.toString());
e.printStackTrace();
}
jsonArray.clear();
i=0;
}
}
publishToKafka(record.toString(), outputTopic, props);
}
Str=jsonArray.toJSONString();
if (!Str.equals("[]") && Str!=null && !Str.isEmpty()) {
HttpResponse<String> Response = ui.post(v_REST_API_ENDPOINT).body(Str).asString();
}
jsonArray.clear();
i=0;
}
);
});
如您所知,当您通过不同的线程同时修改和迭代同一集合时,会发生此异常。 jsonArray不是线程安全的,请用Vector等一些线程安全的集合替换它,然后查看此方法的效果