如何将对象流分割成更小的块,这样它们就不会传递 1×1 的对象,而是传入大小为 N 的列表:
只是为了解释我的意图,但可能有所不同:
objectStream
.map(do something here)
.peek(list -> each list of size N or the last one shorter than N)
....
现在这对您没有太大帮助,但未来的 JDK 可能允许您使用流收集器来执行此操作,请参阅JEP461。
JEP461 中提供的示例展示了更改流元素以进行下游处理、处理多个元素的中间操作或更改流元素类型的不同方法。
这展示了如何将元素分组为 2/3/4 个元素的窗口大小,并以 --enable-preview
模式在最新的
JDK22中运行:
Stream.of(0,1,2,3,4,5,6,7,8,9).gather(fixedWindow(2)).forEach(System.out::println);
=>
[0, 1]
[2, 3]
[4, 5]
[6, 7]
[8, 9]
Stream.of(0,1,2,3,4,5,6,7,8,9).gather(fixedWindow(3)).forEach(System.out::println);
=>
[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9]
Stream.of(0,1,2,3,4,5,6,7,8,9).gather(fixedWindow(4)).forEach(System.out::println);
=>
[0, 1, 2, 3]
[4, 5, 6, 7]
[8, 9]
这适用于固定窗口收集(从 JEP461 复制):
static <TR> Gatherer<TR, ?, List<TR>> fixedWindow(int windowSize) {
// Validate input
if (windowSize < 1)
throw new IllegalArgumentException("window size must be non-zero");
// This gatherer is inherently order-dependent,
// so it should not be parallelized
return Gatherer.ofSequential(
// The initializer creates an ArrayList which holds the current
// open window
() -> new ArrayList<TR>(windowSize),
// The integrator is invoked for each element consumed
Gatherer.Integrator.ofGreedy((window, element, downstream) -> {
// Add the element to the current open window
window.add(element);
// Until we reach our desired window size,
// return true to signal that more elements are desired
if (window.size() < windowSize)
return true;
// When window is full, close it by creating a copy
var result = new ArrayList<TR>(window);
// Clear the window so the next can be started
window.clear();
// Send the closed window downstream
return downstream.push(result);
}),
// The combiner is omitted since this operation is intrinsically sequential,
// and thus cannot be parallelized
// The finisher runs when there are no more elements to pass from the upstream
(window, downstream) -> {
// If the downstream still accepts more elements and the current
// open window is non-empty then send a copy of it downstream
if(!downstream.isRejecting() && !window.isEmpty()) {
downstream.push(new ArrayList<TR>(window));
window.clear();
}
}
);
}