我想知道并行流的流管道中的流操作是否等待先前的流操作完成对所有流元素的处理。
如果我具有以下流管道:
List <String> parsedNumbers = IntStream.range(1, 6)
.parallel()
.map(String::parseInt)
.map(integerAsString => {
System.out.println("First print statement: " + integerAsString);
return integerAsString;
})
.map(integerAsString => {
System.out.println("Second print statement: " + integerAsString);
return integerAsString;
})
.collect(Collectors.toList());
[是否可能已经为元素X调用了System.out.println("First print statement: " + integerAsString)
,但仍对流中的另一个元素Y进行了String::parseInt
操作?
可能此代码的输出如下:
第一份印刷声明:1第一份印刷声明:2第一份印刷声明:3第二份印刷声明:1第二次印刷声明:2第一份印刷声明:4第二次印刷声明:3第二印刷声明:4第一份印刷声明:5第二次印刷声明:5
将会总是像这样:
第一份印刷声明:1第一份印刷声明:2第一份印刷声明:3第一份印刷声明:4第一份印刷声明:5第二份印刷声明:1第二次印刷声明:2第二次印刷声明:3第二印刷声明:4第二次印刷声明:5
是的。 Intermediate
阶段可以按任何顺序执行,terminal
操作具有定义的顺序,如果流的源具有顺序(例如,与Set
不同),和流本身不会改变该顺序命令(调用unordered
-尽管目前执行的并不多)。
也就是说:您真的不知道在给定的时间点哪个元素将流经一个阶段,对于并行流,如何处理元素没有顺序。
更大的问题是为什么,您在乎吗?中间操作应该是无副作用的,依赖任何命令都是一个坏主意。
即使顺序流也不保证处理顺序。如果数据有一个,则只有最终结果与遇到顺序一致。
当您运行以下顺序代码时
List<String> parsedNumbers = IntStream.range(1, 6)
.mapToObj(String::valueOf)
.map(integerAsString -> {
System.out.println("First print statement: " + integerAsString);
return integerAsString;
})
.map(integerAsString -> {
System.out.println("Second print statement: " + integerAsString);
return integerAsString;
})
.collect(Collectors.toList());
它将打印
First print statement: 1
Second print statement: 1
First print statement: 2
Second print statement: 2
First print statement: 3
Second print statement: 3
First print statement: 4
Second print statement: 4
First print statement: 5
Second print statement: 5
显示流无法正常运行。该参考实现有一个明确的偏好,那就是在处理下一个元素之前,先将每个元素传递给整个流。启用并行处理时,将在每个CPU内核上执行相同的处理逻辑。
所以当我使用时
List<String> parsedNumbers = IntStream.range(1, 6)
.parallel()
.mapToObj(String::valueOf)
.map(integerAsString -> {
System.out.println("First print statement: " + integerAsString);
return integerAsString;
})
.map(integerAsString -> {
System.out.println("Second print statement: " + integerAsString);
return integerAsString;
})
.collect(Collectors.toList());
我在计算机上收到类似的内容:
First print statement: 5
First print statement: 2
First print statement: 1
First print statement: 4
First print statement: 3
Second print statement: 5
Second print statement: 2
Second print statement: 1
Second print statement: 4
Second print statement: 3
这看起来像是在第二个语句之前处理了第一条打印语句,但这只是一个巧合,它具有比流元素更多的CPU内核,而且运气还不错。例如,当我将range(1, 6)
更改为range(1, 18)
时,会得到类似的信息>
First print statement: 6 First print statement: 10 First print statement: 9 First print statement: 3 First print statement: 15 First print statement: 5 Second print statement: 9 First print statement: 11 First print statement: 8 Second print statement: 3 Second print statement: 11 Second print statement: 5 Second print statement: 10 Second print statement: 6 First print statement: 7 First print statement: 12 Second print statement: 8 Second print statement: 15 Second print statement: 12 Second print statement: 7 First print statement: 2 First print statement: 17 First print statement: 14 First print statement: 4 Second print statement: 14 Second print statement: 17 Second print statement: 2 First print statement: 1 First print statement: 16 First print statement: 13 Second print statement: 16 Second print statement: 1 Second print statement: 4 Second print statement: 13
不仅不能保证处理顺序,也不能保证要处理哪些元素,例如
IntStream.range(1, 30) .filter(i -> i%13 == 1) .peek(i -> System.out.println("processing "+i)) .parallel() .findFirst() .ifPresent(i -> System.out.println("result is "+i));
根据我的设置制作
processing 14 processing 1 processing 27 result is 1
因此,尽管结果保证为
1
,这是遇到顺序中的第一个匹配元素,但不保证不会处理其他按照遇到顺序排列的元素。