如何将Java流转变成滑动窗口?

问题描述 投票:0回答:8

将流转换为滑动窗口的推荐方法是什么?

例如,在 Ruby 中你可以使用 each_cons:

irb(main):020:0> [1,2,3,4].each_cons(2) { |x| puts x.inspect }
[1, 2]
[2, 3]
[3, 4]
=> nil
irb(main):021:0> [1,2,3,4].each_cons(3) { |x| puts x.inspect }
[1, 2, 3]
[2, 3, 4]
=> nil

在Guava中,我只找到了Iterators#partition,有相关但没有滑动窗口:

final Iterator<List<Integer>> partition =
   Iterators.partition(IntStream.range(1, 5).iterator(), 3);
partition.forEachRemaining(System.out::println);
-->
[1, 2, 3]
[4]
java java-8 java-stream
8个回答
36
投票

API中没有这样的功能,因为它同时支持顺序和并行处理,并且很难为任意流源提供高效的滑动窗口函数并行处理(即使高效的对并行处理也很难,我实现了它,所以我知道)。

但是,如果您的源是具有快速随机访问功能的

List
,您可以使用
subList()
方法来获得所需的行为,如下所示:

public static <T> Stream<List<T>> sliding(List<T> list, int size) {
    if(size > list.size()) 
        return Stream.empty();
    return IntStream.range(0, list.size()-size+1)
                    .mapToObj(start -> list.subList(start, start+size));
}

类似的方法实际上可以在我的StreamEx库中使用:请参阅

StreamEx.ofSubLists()

还有一些其他第三方解决方案不关心并行处理,并使用一些内部缓冲区提供滑动功能。例如,质子包

StreamUtils.windowed


14
投票

如果你愿意使用第三方库并且不需要并行性,那么jOOλ提供了如下的SQL风格的窗口函数

int n = 2;

System.out.println(
Seq.of(1, 2, 3, 4)
   .window(0, n - 1)
   .filter(w -> w.count() == n)
   .map(w -> w.window().toList())
   .toList()
);

屈服

[[1, 2], [2, 3], [3, 4]]

还有

int n = 3;

System.out.println(
Seq.of(1, 2, 3, 4)
   .window(0, n - 1)
   .filter(w -> w.count() == n)
   .map(w -> w.window().toList())
   .toList()
);

屈服

[[1, 2, 3], [2, 3, 4]]

这是一篇关于其工作原理的博客文章

免责声明:我在jOOλ背后的公司工作


8
投票

另一个选项 cyclops-react 构建在 jOOλ 的 Seq 接口(和 JDK 8 Stream)之上,但 simple-react 重新构建了并发/并行性(如果这对您很重要 - 通过创建 Futures 流)。

您可以将 Lukas 强大的窗口函数与任一库一起使用(因为我们扩展了很棒的 jOOλ),但还有一个滑动运算符,我认为在这种情况下简化了事情并且适合在无限流中使用(即它不消耗流,但在流经时缓冲值)。

使用 ReactiveSeq 它看起来像这样 -

ReactiveSeq.of(1, 2, 3, 4)
           .sliding(2)
           .forEach(System.out::println);

使用 LazyFutureStream 可能类似于下面的示例 -

 LazyFutureStream.iterate(1,i->i+1)
                 .sliding(3,2) //lists of 3, increment 2
                 .forEach(System.out::println);

在 cyclops-streams StreamUtils 类中还提供了用于在 java.util.stream.Stream 上创建滑动视图的等效静态方法。

       StreamUtils.sliding(Stream.of(1,2,3,4),2)
                  .map(Pair::new);

如果您想直接使用每个滑动视图,您可以使用返回列表转换器的 sliderT 运算符。例如,为每个滑动视图中的每个元素添加一个数字,然后将每个滑动窗口减少为其元素的总和:-

        ReactiveSeq<Integer> windowsSummed = ReactiveSeq.fromIterable(data)
                                                        .slidingT(3)
                                                        .map(a->a+toAdd)
                                                        .reduce(0,(a,b)->a+b)
                                                        .stream();

免责声明:我在 cyclops-react 背后的公司工作


6
投票

如果您想将 Scala 持久集合的全部功能引入 Java,您可以使用库 Vavr,以前称为 Javaslang。

// this imports List, Stream, Iterator, ...
import io.vavr.collection.*;

Iterator.range(1, 5).sliding(3)
        .forEach(System.out::println);
// --->
// List(1, 2, 3)
// List(2, 3, 4)

Iterator.range(1, 5).sliding(2, 3)
        .forEach(System.out::println);
// --->
// List(1, 2)
// List(4)

Iterator.ofAll(javaStream).sliding(3);

您不仅可以使用 Iterator,这也适用于几乎任何其他 Vavr 集合:Array、Vector、List、Stream、Queue、HashSet、LinkedHashSet、TreeSet,...

(Javaslang 2.1.0-alpha 概述)

免责声明:我是 Vavr(以前称为 Javaslang)的创建者。


3
投票

我在 Tomek 的 Nurkiewicz 博客上找到了解决方案(https://www.nurkiewicz.com/2014/07/grouping-sampling-and-batching-custom.html)。您可以使用以下

SlidingCollector

public class SlidingCollector<T> implements Collector<T, List<List<T>>, List<List<T>>> {

    private final int size;
    private final int step;
    private final int window;
    private final Queue<T> buffer = new ArrayDeque<>();
    private int totalIn = 0;

    public SlidingCollector(int size, int step) {
        this.size = size;
        this.step = step;
        this.window = max(size, step);
    }

    @Override
    public Supplier<List<List<T>>> supplier() {
        return ArrayList::new;
    }

    @Override
    public BiConsumer<List<List<T>>, T> accumulator() {
        return (lists, t) -> {
            buffer.offer(t);
            ++totalIn;
            if (buffer.size() == window) {
                dumpCurrent(lists);
                shiftBy(step);
            }
        };
    }

    @Override
    public Function<List<List<T>>, List<List<T>>> finisher() {
        return lists -> {
            if (!buffer.isEmpty()) {
                final int totalOut = estimateTotalOut();
                if (totalOut > lists.size()) {
                    dumpCurrent(lists);
                }
            }
            return lists;
        };
    }

    private int estimateTotalOut() {
        return max(0, (totalIn + step - size - 1) / step) + 1;
    }

    private void dumpCurrent(List<List<T>> lists) {
        final List<T> batch = buffer.stream().limit(size).collect(toList());
        lists.add(batch);
    }

    private void shiftBy(int by) {
        for (int i = 0; i < by; i++) {
            buffer.remove();
        }
    }

    @Override
    public BinaryOperator<List<List<T>>> combiner() {
        return (l1, l2) -> {
            throw new UnsupportedOperationException("Combining not possible");
        };
    }

    @Override
    public Set<Characteristics> characteristics() {
        return EnumSet.noneOf(Characteristics.class);
    }

}

下面是 Tomekin Spock 的一些示例(我希望它是可读的):

import static com.nurkiewicz.CustomCollectors.sliding

@Unroll
class CustomCollectorsSpec extends Specification {

    def "Sliding window of #input with size #size and step of 1 is #output"() {
        expect:
        input.stream().collect(sliding(size)) == output

        where:
        input  | size | output
        []     | 5    | []
        [1]    | 1    | [[1]]
        [1, 2] | 1    | [[1], [2]]
        [1, 2] | 2    | [[1, 2]]
        [1, 2] | 3    | [[1, 2]]
        1..3   | 3    | [[1, 2, 3]]
        1..4   | 2    | [[1, 2], [2, 3], [3, 4]]
        1..4   | 3    | [[1, 2, 3], [2, 3, 4]]
        1..7   | 3    | [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7]]
        1..7   | 6    | [1..6, 2..7]
    }

    def "Sliding window of #input with size #size and no overlapping is #output"() {
        expect:
        input.stream().collect(sliding(size, size)) == output

        where:
        input | size | output
        []    | 5    | []
        1..3  | 2    | [[1, 2], [3]]
        1..4  | 4    | [1..4]
        1..4  | 5    | [1..4]
        1..7  | 3    | [1..3, 4..6, [7]]
        1..6  | 2    | [[1, 2], [3, 4], [5, 6]]
    }

    def "Sliding window of #input with size #size and some overlapping is #output"() {
        expect:
        input.stream().collect(sliding(size, 2)) == output

        where:
        input | size | output
        []    | 5    | []
        1..4  | 5    | [[1, 2, 3, 4]]
        1..7  | 3    | [1..3, 3..5, 5..7]
        1..6  | 4    | [1..4, 3..6]
        1..9  | 4    | [1..4, 3..6, 5..8, 7..9]
        1..10 | 4    | [1..4, 3..6, 5..8, 7..10]
        1..11 | 4    | [1..4, 3..6, 5..8, 7..10, 9..11]
    }

    def "Sliding window of #input with size #size and gap of #gap is #output"() {
        expect:
        input.stream().collect(sliding(size, size + gap)) == output

        where:
        input | size | gap | output
        []    | 5    | 1   | []
        1..9  | 4    | 2   | [1..4, 7..9]
        1..10 | 4    | 2   | [1..4, 7..10]
        1..11 | 4    | 2   | [1..4, 7..10]
        1..12 | 4    | 2   | [1..4, 7..10]
        1..13 | 4    | 2   | [1..4, 7..10, [13]]
        1..13 | 5    | 1   | [1..5, 7..11, [13]]
        1..12 | 5    | 3   | [1..5, 9..12]
        1..13 | 5    | 3   | [1..5, 9..13]
    }

    def "Sampling #input taking every #nth th element is #output"() {
        expect:
        input.stream().collect(sliding(1, nth)) == output

        where:
        input  | nth | output
        []     | 1   | []
        []     | 5   | []
        1..3   | 5   | [[1]]
        1..6   | 2   | [[1], [3], [5]]
        1..10  | 5   | [[1], [6]]
        1..100 | 30  | [[1], [31], [61], [91]]
    }
}

2
投票

不太确定这是否“安全”或“良好”,但也许你们让我知道。

public static <T> Stream<List<T>> sliding(Stream<T> stream, int window) {
    Queue<T> queue = new LinkedList<>();
    return stream.dropWhile(item -> {
        if (queue.size() < window - 1) {
            queue.add(item);
            return true;
        }
        return false;
    }).map(item -> {
        queue.add(item);
        List<T> ret = queue.stream().toList();
        queue.remove();
        return ret;
    });
}

public static void main(String[] args) {
    sliding(Stream.of(1, 2, 3, 4, 5, 6, 7), 3).forEach(x -> System.out.println(x));
}
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
[4, 5, 6]
[5, 6, 7]

1
投票

另一种选择是实现自定义 Spliterator,就像此处

import java.util.*;

public class SlidingWindowSpliterator<T> implements Spliterator<Stream<T>> {

    static <T> Stream<Stream<T>> windowed(Collection<T> stream, int windowSize) {
        return StreamSupport.stream(
          new SlidingWindowSpliterator<>(stream, windowSize), false);
    }

    private final Queue<T> buffer;
    private final Iterator<T> sourceIterator;
    private final int windowSize;
    private final int size;

    private SlidingWindowSpliterator(Collection<T> source, int windowSize) {
        this.buffer = new ArrayDeque<>(windowSize);
        this.sourceIterator = Objects.requireNonNull(source).iterator();
        this.windowSize = windowSize;
        this.size = calculateSize(source, windowSize);
    }

    @Override
    public boolean tryAdvance(Consumer<? super Stream<T>> action) {
        if (windowSize < 1) {
            return false;
        }

        while (sourceIterator.hasNext()) {
            buffer.add(sourceIterator.next());

            if (buffer.size() == windowSize) {
                action.accept(Arrays.stream((T[]) buffer.toArray(new Object[0])));
                buffer.poll();
                return sourceIterator.hasNext();
            }
        }

        return false;
    }

    @Override
    public Spliterator<Stream<T>> trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
       return size;
    }

    @Override
    public int characteristics() {
        return ORDERED | NONNULL | SIZED;
    }

    private static int calculateSize(Collection<?> source, int windowSize) {
        return source.size() < windowSize
          ? 0
          : source.size() - windowSize + 1;
    }
}

0
投票

JEP 461:Stream GatherersJava 22 预览语言功能添加了对此的内置支持:

Stream<List<Integer>> stream =
        Stream.of(0, 1, 2, 3, 4).gather(Gatherers.windowSliding(2));

这使用新的

Stream.gather
方法和新的内置
Gatherers.windowSliding
收集器将初始
Stream<T>
转换为成对
Stream<List<T>>

Java文档

Gatherer

将输入元素流转换为输出元素流的中间操作,可以选择在到达上游末尾时应用最终操作。 […]

[…]

聚集操作的例子有很多,包括但不限于:将元素分组(窗口函数);对连续相似的元素进行去重;增量累加功能(前缀扫描);增量重新排序功能等。类

Gatherers
提供了常见收集操作的实现。

Stream.gather

返回一个流,其中包含将给定收集器应用于该流的元素的结果。

Gatherers.windowSliding

返回一个 Gatherer,它将元素收集到给定大小的窗口(遇到有序的元素组)中,其中每个后续窗口都包含前一个窗口的所有元素(除了最近的元素之外),并在流中添加下一个元素。 […]

示例:

// will contain: [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8]]
List<List<Integer>> windows2 =
    Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(2)).toList();

// will contain: [[1, 2, 3, 4, 5, 6], [2, 3, 4, 5, 6, 7], [3, 4, 5, 6, 7, 8]]
List<List<Integer>> windows6 =
    Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(6)).toList();
© www.soinside.com 2019 - 2024. All rights reserved.