如何对流进行分块并保持其继续运行

问题描述 投票:0回答:1

如何将对象流分割成更小的块,这样它们就不会传递 1×1 的对象,而是传入大小为 N 的列表:

只是为了解释我的意图,但可能有所不同:

objectStream
   .map(do something here)
   .peek(list -> each list of size N or the last one shorter than N)
   ....
java java-stream
1个回答
1
投票

现在这对您没有太大帮助,但未来的 JDK 可能允许您使用流收集器来执行此操作,请参阅JEP461

JEP461 中提供的示例展示了更改流元素以进行下游处理、处理多个元素的中间操作或更改流元素类型的不同方法。

这展示了如何将元素分组为 2/3/4 个元素的窗口大小,并以 --enable-preview 模式在最新的

JDK22
中运行:

Stream.of(0,1,2,3,4,5,6,7,8,9).gather(fixedWindow(2)).forEach(System.out::println);
=>
[0, 1]
[2, 3]
[4, 5]
[6, 7]
[8, 9]

Stream.of(0,1,2,3,4,5,6,7,8,9).gather(fixedWindow(3)).forEach(System.out::println);
=>
[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9]

Stream.of(0,1,2,3,4,5,6,7,8,9).gather(fixedWindow(4)).forEach(System.out::println);
=>
[0, 1, 2, 3]
[4, 5, 6, 7]
[8, 9]

这适用于固定窗口收集(从 JEP461 复制):

static <TR> Gatherer<TR, ?, List<TR>> fixedWindow(int windowSize) {

    // Validate input
    if (windowSize < 1)
      throw new IllegalArgumentException("window size must be non-zero");

    // This gatherer is inherently order-dependent,
    // so it should not be parallelized
    return Gatherer.ofSequential(

            // The initializer creates an ArrayList which holds the current
            // open window
            () -> new ArrayList<TR>(windowSize),

            // The integrator is invoked for each element consumed
            Gatherer.Integrator.ofGreedy((window, element, downstream) -> {

                // Add the element to the current open window
                window.add(element);

                // Until we reach our desired window size,
                // return true to signal that more elements are desired
                if (window.size() < windowSize)
                    return true;

                // When window is full, close it by creating a copy
                var result = new ArrayList<TR>(window);

                // Clear the window so the next can be started
                window.clear();

                // Send the closed window downstream
                return downstream.push(result);

            }),

            // The combiner is omitted since this operation is intrinsically sequential,
            // and thus cannot be parallelized

            // The finisher runs when there are no more elements to pass from the upstream
            (window, downstream) -> {
                // If the downstream still accepts more elements and the current
                // open window is non-empty then send a copy of it downstream
                if(!downstream.isRejecting() && !window.isEmpty()) {
                    downstream.push(new ArrayList<TR>(window));
                    window.clear();
                }
            }

    );
}
© www.soinside.com 2019 - 2024. All rights reserved.