ParallelStream 未按我的预期工作(行为与使用 Stream() 时不同

问题描述 投票:0回答:1

我有一个 spring jpa 存储库和一个用于检索数据库中对象列表的服务

public interface ObjectInDbRepository extends JpaRepository<ObjectInDb, Long> {
    List<ObjectInDb> findByObjA(ObjA name);

    List<ObjectInDb> findByObjB(ObjB value);
}

我的服务是

@Service
@Slf4j
@RequiredArgsConstructor
public class ObjectInDbService implements IObjectInDbService {
    private final ObjectInDbRepository objectInDbRepository;
       
    @Override
    public List<ObjectInDb> findObjectInDbCodes(ParentObj obj) {
        if (null == obj) {
            return Collections.emptyList();
        }
        if (obj instanceof ObjA) {
            return objectInDbRepository.findByObjA((ObjA) obj);
        } else {
            return objectInDbRepository.findByObjB((ObjB) obj);
        }
    }
}

当我的代码在 ConcurrentHashMap 流上调用这些服务时,无论使用流还是并行流,我都会得到不同的行为

@Service
@Slf4j
@RequiredArgsConstructor
public class MySchedulerService {
    private final ObjectInDbService objectInDbService;

    @Scheduled(fixedDelayString = "${scheduler}")
    public void scheduleFixedDelayTask() {
            // Init of the map we will parse
            Map<String, List<ParentObj>> parentObjMap= new ConcurrentHashMap<>();
            // Map is filled with 2 items, each of them having 1 list of 1 item

            // This is where the test is done: fails if parallelStream, works if stream() 
           parentObjMap.entrySet().parallelStream().forEach(parentObjItem -> {
                for(ParentObj parentObj : parentObjItem.getValue()) {
                    List<ObjectInDb> fromDB = objectInDbService.findObjectInDbCodes(parentObj);
                    log.info("TEST " + parentObjItem.getKey() + " " + fromDB);
                }
            });
    }
}

我收到以下日志:

TEST Ref1 [ObjectInDb@1]
TEST Ref2 []

当我有parallelStream但有“正常”stream()时我得到

TEST Ref1 [ObjectInDb@1]
TEST Ref2 [ObjectInDb@2]

基本上,当我使用流时,我会从数据库中获取包含地图的 2 项和对象列表(包含每一项)的日志。

但是当我使用parallelStream时,我只得到第一个循环的数据库结果。

当我使用 ConcurrentHashMap 时,我真的很难理解为什么会发生这种行为变化

非常感谢您的支持

朱利安

PS:如果您需要查看地图是如何构建的:

        // Init of our lists
        List<ObjA> objAList = ...//We retreive a list of ObjA (extends ParentObj)
        List<ObjB> objBList = ...//We retreive a list of ObjB (extends ParentObj)
        
        // We update the map by groupping by Reference the objAList and objBList
        for (Map.Entry<String, List<ObjA>> objAListPerRef : objAList.stream()
                .collect(Collectors.groupingBy(ObjA::getReference)).entrySet()) {
            parentObjMap.put(objAListPerRef.getKey(), new ArrayList<>(objAListPerRef.getValue()));
        }
        for (Map.Entry<String, List<ObjB>> objBListPerRef : objBList.stream()
                .collect(Collectors.groupingBy(ObjB::getReference)).entrySet()) {
            List<ParentObj> parentObjForThisRef = parentObjMap.getOrDefault(objBListPerRef.getKey(),
                    new ArrayList<>());
            parentObjForThisRef.addAll(objBListPerRef.getValue());
            parentObjMap.put(objBListPerRef.getKey(), parentObjForThisRef);
        }
        
        //for my test, parentObjMap has 2 items, each of them having 1 item in the list (the map value)
java java-stream parallelstream
1个回答
0
投票

当您使用parallelStream()时,操作将跨多个线程并发处理。这可能会导致对共享数据的并发访问,例如您的parentObjMap,这可能不是线程安全的。

在你的情况下,你应该确保你的parentObjMap是线程安全的。实现此目的的一种方法是使用 ConcurrentHashMap代替常规 HashMap,如下所示:

// Initialize parentObjMap as a ConcurrentHashMap
ConcurrentHashMap<String, List<ParentObj>> parentObjMap = new ConcurrentHashMap<>();

// ... Rest of your code ...

parentObjMap.entrySet().parallelStream().forEach(parentObjItem -> {
    for (ParentObj parentObj : parentObjItem.getValue()) {
        List<ObjectInDb> fromDB = objectInDbService.findObjectInDbCodes(parentObj);
        log.info("TEST " + parentObjItem.getKey() + " " + fromDB);
    }
});

此外,请确保在 objectInDbService.findObjectInDbCodes 方法中访问的任何其他共享数据结构也是线程安全的,以避免使用并行流时出现潜在问题。

祝你好运:)

© www.soinside.com 2019 - 2024. All rights reserved.