将流转换为滑动窗口的推荐方法是什么?
例如,在 Ruby 中你可以使用 each_cons:
irb(main):020:0> [1,2,3,4].each_cons(2) { |x| puts x.inspect }
[1, 2]
[2, 3]
[3, 4]
=> nil
irb(main):021:0> [1,2,3,4].each_cons(3) { |x| puts x.inspect }
[1, 2, 3]
[2, 3, 4]
=> nil
在Guava中,我只找到了Iterators#partition,有相关但没有滑动窗口:
final Iterator<List<Integer>> partition =
Iterators.partition(IntStream.range(1, 5).iterator(), 3);
partition.forEachRemaining(System.out::println);
-->
[1, 2, 3]
[4]
API中没有这样的功能,因为它同时支持顺序和并行处理,并且很难为任意流源提供高效的滑动窗口函数并行处理(即使高效的对并行处理也很难,我实现了它,所以我知道)。
但是,如果您的源是具有快速随机访问功能的
List
,您可以使用 subList()
方法来获得所需的行为,如下所示:
public static <T> Stream<List<T>> sliding(List<T> list, int size) {
if(size > list.size())
return Stream.empty();
return IntStream.range(0, list.size()-size+1)
.mapToObj(start -> list.subList(start, start+size));
}
类似的方法实际上可以在我的StreamEx库中使用:请参阅
StreamEx.ofSubLists()
。
还有一些其他第三方解决方案不关心并行处理,并使用一些内部缓冲区提供滑动功能。例如,质子包
StreamUtils.windowed
。
如果你愿意使用第三方库并且不需要并行性,那么jOOλ提供了如下的SQL风格的窗口函数
int n = 2;
System.out.println(
Seq.of(1, 2, 3, 4)
.window(0, n - 1)
.filter(w -> w.count() == n)
.map(w -> w.window().toList())
.toList()
);
屈服
[[1, 2], [2, 3], [3, 4]]
还有
int n = 3;
System.out.println(
Seq.of(1, 2, 3, 4)
.window(0, n - 1)
.filter(w -> w.count() == n)
.map(w -> w.window().toList())
.toList()
);
屈服
[[1, 2, 3], [2, 3, 4]]
免责声明:我在jOOλ背后的公司工作
另一个选项 cyclops-react 构建在 jOOλ 的 Seq 接口(和 JDK 8 Stream)之上,但 simple-react 重新构建了并发/并行性(如果这对您很重要 - 通过创建 Futures 流)。
您可以将 Lukas 强大的窗口函数与任一库一起使用(因为我们扩展了很棒的 jOOλ),但还有一个滑动运算符,我认为在这种情况下简化了事情并且适合在无限流中使用(即它不消耗流,但在流经时缓冲值)。
使用 ReactiveSeq 它看起来像这样 -
ReactiveSeq.of(1, 2, 3, 4)
.sliding(2)
.forEach(System.out::println);
使用 LazyFutureStream 可能类似于下面的示例 -
LazyFutureStream.iterate(1,i->i+1)
.sliding(3,2) //lists of 3, increment 2
.forEach(System.out::println);
在 cyclops-streams StreamUtils 类中还提供了用于在 java.util.stream.Stream 上创建滑动视图的等效静态方法。
StreamUtils.sliding(Stream.of(1,2,3,4),2)
.map(Pair::new);
如果您想直接使用每个滑动视图,您可以使用返回列表转换器的 sliderT 运算符。例如,为每个滑动视图中的每个元素添加一个数字,然后将每个滑动窗口减少为其元素的总和:-
ReactiveSeq<Integer> windowsSummed = ReactiveSeq.fromIterable(data)
.slidingT(3)
.map(a->a+toAdd)
.reduce(0,(a,b)->a+b)
.stream();
免责声明:我在 cyclops-react 背后的公司工作
如果您想将 Scala 持久集合的全部功能引入 Java,您可以使用库 Vavr,以前称为 Javaslang。
// this imports List, Stream, Iterator, ...
import io.vavr.collection.*;
Iterator.range(1, 5).sliding(3)
.forEach(System.out::println);
// --->
// List(1, 2, 3)
// List(2, 3, 4)
Iterator.range(1, 5).sliding(2, 3)
.forEach(System.out::println);
// --->
// List(1, 2)
// List(4)
Iterator.ofAll(javaStream).sliding(3);
您不仅可以使用 Iterator,这也适用于几乎任何其他 Vavr 集合:Array、Vector、List、Stream、Queue、HashSet、LinkedHashSet、TreeSet,...
(Javaslang 2.1.0-alpha 概述)
免责声明:我是 Vavr(以前称为 Javaslang)的创建者。
我在 Tomek 的 Nurkiewicz 博客上找到了解决方案(https://www.nurkiewicz.com/2014/07/grouping-sampling-and-batching-custom.html)。您可以使用以下
SlidingCollector
:
public class SlidingCollector<T> implements Collector<T, List<List<T>>, List<List<T>>> {
private final int size;
private final int step;
private final int window;
private final Queue<T> buffer = new ArrayDeque<>();
private int totalIn = 0;
public SlidingCollector(int size, int step) {
this.size = size;
this.step = step;
this.window = max(size, step);
}
@Override
public Supplier<List<List<T>>> supplier() {
return ArrayList::new;
}
@Override
public BiConsumer<List<List<T>>, T> accumulator() {
return (lists, t) -> {
buffer.offer(t);
++totalIn;
if (buffer.size() == window) {
dumpCurrent(lists);
shiftBy(step);
}
};
}
@Override
public Function<List<List<T>>, List<List<T>>> finisher() {
return lists -> {
if (!buffer.isEmpty()) {
final int totalOut = estimateTotalOut();
if (totalOut > lists.size()) {
dumpCurrent(lists);
}
}
return lists;
};
}
private int estimateTotalOut() {
return max(0, (totalIn + step - size - 1) / step) + 1;
}
private void dumpCurrent(List<List<T>> lists) {
final List<T> batch = buffer.stream().limit(size).collect(toList());
lists.add(batch);
}
private void shiftBy(int by) {
for (int i = 0; i < by; i++) {
buffer.remove();
}
}
@Override
public BinaryOperator<List<List<T>>> combiner() {
return (l1, l2) -> {
throw new UnsupportedOperationException("Combining not possible");
};
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.noneOf(Characteristics.class);
}
}
下面是 Tomekin Spock 的一些示例(我希望它是可读的):
import static com.nurkiewicz.CustomCollectors.sliding
@Unroll
class CustomCollectorsSpec extends Specification {
def "Sliding window of #input with size #size and step of 1 is #output"() {
expect:
input.stream().collect(sliding(size)) == output
where:
input | size | output
[] | 5 | []
[1] | 1 | [[1]]
[1, 2] | 1 | [[1], [2]]
[1, 2] | 2 | [[1, 2]]
[1, 2] | 3 | [[1, 2]]
1..3 | 3 | [[1, 2, 3]]
1..4 | 2 | [[1, 2], [2, 3], [3, 4]]
1..4 | 3 | [[1, 2, 3], [2, 3, 4]]
1..7 | 3 | [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7]]
1..7 | 6 | [1..6, 2..7]
}
def "Sliding window of #input with size #size and no overlapping is #output"() {
expect:
input.stream().collect(sliding(size, size)) == output
where:
input | size | output
[] | 5 | []
1..3 | 2 | [[1, 2], [3]]
1..4 | 4 | [1..4]
1..4 | 5 | [1..4]
1..7 | 3 | [1..3, 4..6, [7]]
1..6 | 2 | [[1, 2], [3, 4], [5, 6]]
}
def "Sliding window of #input with size #size and some overlapping is #output"() {
expect:
input.stream().collect(sliding(size, 2)) == output
where:
input | size | output
[] | 5 | []
1..4 | 5 | [[1, 2, 3, 4]]
1..7 | 3 | [1..3, 3..5, 5..7]
1..6 | 4 | [1..4, 3..6]
1..9 | 4 | [1..4, 3..6, 5..8, 7..9]
1..10 | 4 | [1..4, 3..6, 5..8, 7..10]
1..11 | 4 | [1..4, 3..6, 5..8, 7..10, 9..11]
}
def "Sliding window of #input with size #size and gap of #gap is #output"() {
expect:
input.stream().collect(sliding(size, size + gap)) == output
where:
input | size | gap | output
[] | 5 | 1 | []
1..9 | 4 | 2 | [1..4, 7..9]
1..10 | 4 | 2 | [1..4, 7..10]
1..11 | 4 | 2 | [1..4, 7..10]
1..12 | 4 | 2 | [1..4, 7..10]
1..13 | 4 | 2 | [1..4, 7..10, [13]]
1..13 | 5 | 1 | [1..5, 7..11, [13]]
1..12 | 5 | 3 | [1..5, 9..12]
1..13 | 5 | 3 | [1..5, 9..13]
}
def "Sampling #input taking every #nth th element is #output"() {
expect:
input.stream().collect(sliding(1, nth)) == output
where:
input | nth | output
[] | 1 | []
[] | 5 | []
1..3 | 5 | [[1]]
1..6 | 2 | [[1], [3], [5]]
1..10 | 5 | [[1], [6]]
1..100 | 30 | [[1], [31], [61], [91]]
}
}
不太确定这是否“安全”或“良好”,但也许你们让我知道。
public static <T> Stream<List<T>> sliding(Stream<T> stream, int window) {
Queue<T> queue = new LinkedList<>();
return stream.dropWhile(item -> {
if (queue.size() < window - 1) {
queue.add(item);
return true;
}
return false;
}).map(item -> {
queue.add(item);
List<T> ret = queue.stream().toList();
queue.remove();
return ret;
});
}
public static void main(String[] args) {
sliding(Stream.of(1, 2, 3, 4, 5, 6, 7), 3).forEach(x -> System.out.println(x));
}
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
[4, 5, 6]
[5, 6, 7]
另一种选择是实现自定义 Spliterator,就像此处:
import java.util.*;
public class SlidingWindowSpliterator<T> implements Spliterator<Stream<T>> {
static <T> Stream<Stream<T>> windowed(Collection<T> stream, int windowSize) {
return StreamSupport.stream(
new SlidingWindowSpliterator<>(stream, windowSize), false);
}
private final Queue<T> buffer;
private final Iterator<T> sourceIterator;
private final int windowSize;
private final int size;
private SlidingWindowSpliterator(Collection<T> source, int windowSize) {
this.buffer = new ArrayDeque<>(windowSize);
this.sourceIterator = Objects.requireNonNull(source).iterator();
this.windowSize = windowSize;
this.size = calculateSize(source, windowSize);
}
@Override
public boolean tryAdvance(Consumer<? super Stream<T>> action) {
if (windowSize < 1) {
return false;
}
while (sourceIterator.hasNext()) {
buffer.add(sourceIterator.next());
if (buffer.size() == windowSize) {
action.accept(Arrays.stream((T[]) buffer.toArray(new Object[0])));
buffer.poll();
return sourceIterator.hasNext();
}
}
return false;
}
@Override
public Spliterator<Stream<T>> trySplit() {
return null;
}
@Override
public long estimateSize() {
return size;
}
@Override
public int characteristics() {
return ORDERED | NONNULL | SIZED;
}
private static int calculateSize(Collection<?> source, int windowSize) {
return source.size() < windowSize
? 0
: source.size() - windowSize + 1;
}
}
JEP 461:Stream GatherersJava 22 预览语言功能添加了对此的内置支持:
Stream<List<Integer>> stream =
Stream.of(0, 1, 2, 3, 4).gather(Gatherers.windowSliding(2));
Stream.gather
方法和新的内置 Gatherers.windowSliding
收集器将初始 Stream<T>
转换为成对 Stream<List<T>>
。
Gatherer
:
将输入元素流转换为输出元素流的中间操作,可以选择在到达上游末尾时应用最终操作。 […]
[…]
聚集操作的例子有很多,包括但不限于:将元素分组(窗口函数);对连续相似的元素进行去重;增量累加功能(前缀扫描);增量重新排序功能等。类
提供了常见收集操作的实现。Gatherers
Stream.gather
:
返回一个流,其中包含将给定收集器应用于该流的元素的结果。
Gatherers.windowSliding
返回一个 Gatherer,它将元素收集到给定大小的窗口(遇到有序的元素组)中,其中每个后续窗口都包含前一个窗口的所有元素(除了最近的元素之外),并在流中添加下一个元素。 […]
示例:
// will contain: [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8]] List<List<Integer>> windows2 = Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(2)).toList(); // will contain: [[1, 2, 3, 4, 5, 6], [2, 3, 4, 5, 6, 7], [3, 4, 5, 6, 7, 8]] List<List<Integer>> windows6 = Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(6)).toList();