修改并行流中的本地列表

问题描述 投票:0回答:1

我有一个如下的方法:

  public void method ()
  {
        List<String> list1 = someOperation();
        List<List2Class> list2;
        long failedCount = 0;
        for (String element : list1) {
            try {
                list2 = someclass.method1(element);
                list2 = someclass.method2(element);
                someclass.saveToDB(list2);
            
            } catch (Exception e) {
                failedCount++;
              
            }
        }
       //some checks on failedCount
    }

我想将 for 循环转换为并行流,有人可以告诉我上述方法中的代码应该进行哪些更改吗? PS - method1 和 method2 返回 list2 的修改版本。

java java-8 parallel-processing java-stream
1个回答
0
投票

这里的逻辑基本就是“上次成功操作的结果”。

假设您不需要

failedCount
(您没有显示它正在使用),您可以这样做:将成功的操作映射到当前的
Optional
,将失败的操作映射到不存在的
Optional
;并只取最后一个可选的:

Optional<List<List2Class>> opt = list1.stream()
    .flatMap(element -> Stream.of(runOperation(someclass::method1, element), runOperation(someclass::method2, element))
    .reduce((a, b) -> !b.isPresent() ? a : b);

其中

runOperation
类似于:

Optional<List<List2Class>> runOperation(Function<String, List<List2Class>> operation, String parameter) {
  try {
    return Optional.of(operation.apply(parameter));
  } catch (Exception e) {
    return Optional.absent();
  }
}

然后,您需要决定如果没有操作成功,

list2
应该具有什么值。

如果您确实需要

failedCount
,您可以稍微分解一下:

Stream<Optional<List<List2Class>>> intermediate =
    list1.stream()
        .flatMap(element -> Stream.of(runOperation(someclass::method1, element), runOperation(someclass::method2, element));

Map<Boolean, List<Optional<List<List2Class>>>> partitioned =
    intermediate.collect(Collectors.partitioningBy(Optional::isPresent));

现在

partitioned.get(true)
有所有成功的结果,而
partitioned.get(false)
有所有失败的结果。所以:

Optional<List<List2Class>> opt = partitioned.get(true).stream().reduce((a, b) -> !b.isPresent() ? a : b);
long failedCount = partitioned.get(false).size();
© www.soinside.com 2019 - 2024. All rights reserved.