我有一个 spring jpa 存储库和一个用于检索数据库中对象列表的服务
public interface ObjectInDbRepository extends JpaRepository<ObjectInDb, Long> {
List<ObjectInDb> findByObjA(ObjA name);
List<ObjectInDb> findByObjB(ObjB value);
}
我的服务是
@Service
@Slf4j
@RequiredArgsConstructor
public class ObjectInDbService implements IObjectInDbService {
private final ObjectInDbRepository objectInDbRepository;
@Override
public List<ObjectInDb> findObjectInDbCodes(ParentObj obj) {
if (null == obj) {
return Collections.emptyList();
}
if (obj instanceof ObjA) {
return objectInDbRepository.findByObjA((ObjA) obj);
} else {
return objectInDbRepository.findByObjB((ObjB) obj);
}
}
}
当我的代码在 ConcurrentHashMap 流上调用这些服务时,无论使用流还是并行流,我都会得到不同的行为
@Service
@Slf4j
@RequiredArgsConstructor
public class MySchedulerService {
private final ObjectInDbService objectInDbService;
@Scheduled(fixedDelayString = "${scheduler}")
public void scheduleFixedDelayTask() {
// Init of the map we will parse
Map<String, List<ParentObj>> parentObjMap= new ConcurrentHashMap<>();
// Map is filled with 2 items, each of them having 1 list of 1 item
// This is where the test is done: fails if parallelStream, works if stream()
parentObjMap.entrySet().parallelStream().forEach(parentObjItem -> {
for(ParentObj parentObj : parentObjItem.getValue()) {
List<ObjectInDb> fromDB = objectInDbService.findObjectInDbCodes(parentObj);
log.info("TEST " + parentObjItem.getKey() + " " + fromDB);
}
});
}
}
我收到以下日志:
TEST Ref1 [ObjectInDb@1]
TEST Ref2 []
当我有parallelStream但有“正常”stream()时我得到
TEST Ref1 [ObjectInDb@1]
TEST Ref2 [ObjectInDb@2]
基本上,当我使用流时,我会从数据库中获取包含地图的 2 项和对象列表(包含每一项)的日志。
但是当我使用parallelStream时,我只得到第一个循环的数据库结果。
当我使用 ConcurrentHashMap 时,我真的很难理解为什么会发生这种行为变化
非常感谢您的支持
朱利安
PS:如果您需要查看地图是如何构建的:
// Init of our lists
List<ObjA> objAList = ...//We retreive a list of ObjA (extends ParentObj)
List<ObjB> objBList = ...//We retreive a list of ObjB (extends ParentObj)
// We update the map by groupping by Reference the objAList and objBList
for (Map.Entry<String, List<ObjA>> objAListPerRef : objAList.stream()
.collect(Collectors.groupingBy(ObjA::getReference)).entrySet()) {
parentObjMap.put(objAListPerRef.getKey(), new ArrayList<>(objAListPerRef.getValue()));
}
for (Map.Entry<String, List<ObjB>> objBListPerRef : objBList.stream()
.collect(Collectors.groupingBy(ObjB::getReference)).entrySet()) {
List<ParentObj> parentObjForThisRef = parentObjMap.getOrDefault(objBListPerRef.getKey(),
new ArrayList<>());
parentObjForThisRef.addAll(objBListPerRef.getValue());
parentObjMap.put(objBListPerRef.getKey(), parentObjForThisRef);
}
//for my test, parentObjMap has 2 items, each of them having 1 item in the list (the map value)
当您使用parallelStream()时,操作将跨多个线程并发处理。这可能会导致对共享数据的并发访问,例如您的parentObjMap,这可能不是线程安全的。
在你的情况下,你应该确保你的parentObjMap是线程安全的。实现此目的的一种方法是使用 ConcurrentHashMap代替常规 HashMap,如下所示:
// Initialize parentObjMap as a ConcurrentHashMap
ConcurrentHashMap<String, List<ParentObj>> parentObjMap = new ConcurrentHashMap<>();
// ... Rest of your code ...
parentObjMap.entrySet().parallelStream().forEach(parentObjItem -> {
for (ParentObj parentObj : parentObjItem.getValue()) {
List<ObjectInDb> fromDB = objectInDbService.findObjectInDbCodes(parentObj);
log.info("TEST " + parentObjItem.getKey() + " " + fromDB);
}
});
此外,请确保在 objectInDbService.findObjectInDbCodes 方法中访问的任何其他共享数据结构也是线程安全的,以避免使用并行流时出现潜在问题。
祝你好运:)