我可以在Java 8中复制Stream吗?

问题描述 投票:47回答:7

有时我想在流上执行一组操作,然后以不同的方式处理结果流与其他操作。

我是否可以这样做而无需指定两次常见的初始操作?

例如,我希望存在如下的dup()方法:

Stream [] desired_streams = IntStream.range(1, 100).filter(n -> n % 2 == 0).dup();
Stream stream14 = desired_streams[0].filter(n -> n % 7 == 0); // multiples of 14
Stream stream10 = desired_streams[1].filter(n -> n % 5 == 0); // multiples of 10
java java-8 java-stream
7个回答
26
投票

一般来说这是不可能的。

如果要复制输入流或输入迭代器,则有两个选项:

A. Keep everything in a collection, say a List<>

假设您将流复制到两个流s1s2。如果你在n1s1元素中使用n2有高级s2元素,你必须在记忆中保留|n2 - n1|元素,以保持步伐。如果您的流是无限的,则可能没有所需存储的上限。

看看Python的tee(),看看它需要什么:

这个itertool可能需要大量的辅助存储(取决于需要存储多少临时数据)。通常,如果一个迭代器在另一个迭代器启动之前使用大部分或全部数据,则使用list()而不是tee()会更快。

B. When possible: Copy the state of the generator that creates the elements

要使此选项起作用,您可能需要访问流的内部工作方式。换句话说,生成器 - 创建元素的部分 - 应该首先支持复制。 [OP:请参阅此great answer,作为如何在问题中为示例执行此操作的示例]

它不适用于用户的输入,因为您必须复制整个“外部世界”的状态。 Java的Stream不支持复制,因为它设计得尽可能通用,专门用于处理文件,网络,键盘,传感器,随机性等。[OP:另一个例子是按需读取温度传感器的流。不存储读数副本就不能复制]

这不仅仅是Java中的情况;这是一般规则。您可以看到C ++中的std::istream仅支持移动语义,而不支持复制语义(“复制构造函数(已删除)”),因此(以及其他)。


40
投票

无法以这种方式复制流。但是,您可以通过将公共部分移动到方法或lambda表达式来避免代码重复。

Supplier<IntStream> supplier = () ->
    IntStream.range(1, 100).filter(n -> n % 2 == 0);
supplier.get().filter(...);
supplier.get().filter(...);

6
投票

如果你正在缓冲你在一个副本中消耗的元素,但在另一个副本中却没有。

我们在duplicate()中实现了一个jOOλ方法,这是一个开源库,我们创建它来改进jOOQ的集成测试。基本上,你可以写:

Tuple2<Seq<Integer>, Seq<Integer>> desired_streams = Seq.seq(
    IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed()
).duplicate();

(注意:我们目前需要打包流,因为我们还没有实现IntSeq

在内部,有一个LinkedList缓冲区,存储从一个流消耗但不从另一个流消耗的所有值。如果您的两个流以相同的速率消耗,那么这可能是有效的。

以下是算法的工作原理:

static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
    final LinkedList<T> gap = new LinkedList<>();
    final Iterator<T> it = stream.iterator();

    @SuppressWarnings("unchecked")
    final Iterator<T>[] ahead = new Iterator[] { null };

    class Duplicate implements Iterator<T> {
        @Override
        public boolean hasNext() {
            if (ahead[0] == null || ahead[0] == this)
                return it.hasNext();

            return !gap.isEmpty();
        }

        @Override
        public T next() {
            if (ahead[0] == null)
                ahead[0] = this;

            if (ahead[0] == this) {
                T value = it.next();
                gap.offer(value);
                return value;
            }

            return gap.poll();
        }
    }

    return tuple(seq(new Duplicate()), seq(new Duplicate()));
}

More source code here

事实上,使用jOOλ,你将能够像这样写一个完整的单行:

Tuple2<Seq<Integer>, Seq<Integer>> desired_streams = Seq.seq(
    IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed()
).duplicate()
 .map1(s -> s.filter(n -> n % 7 == 0))
 .map2(s -> s.filter(n -> n % 5 == 0));

// This will yield 14, 28, 42, 56...
desired_streams.v1.forEach(System.out::println)

// This will yield 10, 20, 30, 40...
desired_streams.v2.forEach(System.out::println);

4
投票

您还可以将流生成移动到单独的方法/函数中,该方法/函数返回此流并将其调用两次。


3
投票

要么,

  • 将初始化移动到方法中,然后再次调用该方法

这样做的好处是可以明确地了解您正在做什么,并且还适用于无限流。

  • 收集流然后重新流

在你的例子中:

final int[] arr = IntStream.range(1, 100).filter(n -> n % 2 == 0).toArray();

然后

final IntStream s = IntStream.of(arr);

2
投票

更新:这不起作用。在原始答案的文本之后,请参阅下面的说明。

我有多傻。我需要做的就是:

Stream desired_stream = IntStream.range(1, 100).filter(n -> n % 2 == 0);
Stream stream14 = desired_stream.filter(n -> n % 7 == 0); // multiples of 14
Stream stream10 = desired_stream.filter(n -> n % 5 == 0); // multiples of 10

解释为什么这不起作用:

如果你编码并尝试收集两个流,第一个将收集罚款,但尝试流第二个将抛出异常:java.lang.IllegalStateException: stream has already been operated upon or closed

详细说明,流是有状态对象(顺便说一句,它不能被重置或重绕)。您可以将它们视为迭代器,而迭代器又像指针一样。所以stream14stream10可以被认为是对同一指针的引用。一直使用第一个流将导致指针“越过末尾”。尝试使用第二个流就像尝试访问已经“越过末尾”的指针,这自然是非法操作。

如接受的答案所示,创建流的代码必须执行两次,但可以将其划分为Supplier lambda或类似的构造。

完整的测试代码:保存到Foo.java,然后javac Foo.java,然后java Foo

import java.util.stream.IntStream;

public class Foo {
  public static void main (String [] args) {
    IntStream s = IntStream.range(0, 100).filter(n -> n % 2 == 0);
    IntStream s1 = s.filter(n -> n % 5 == 0);
    s1.forEach(n -> System.out.println(n));
    IntStream s2 = s.filter(n -> n % 7 == 0);
    s2.forEach(n -> System.out.println(n));
  }
}

输出:

$ javac Foo.java
$ java Foo
0
10
20
30
40
50
60
70
80
90
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.util.stream.AbstractPipeline.<init>(AbstractPipeline.java:203)
    at java.util.stream.IntPipeline.<init>(IntPipeline.java:91)
    at java.util.stream.IntPipeline$StatelessOp.<init>(IntPipeline.java:592)
    at java.util.stream.IntPipeline$9.<init>(IntPipeline.java:332)
    at java.util.stream.IntPipeline.filter(IntPipeline.java:331)
    at Foo.main(Foo.java:8)

0
投票

对于非无限流,如果您有权访问源,则直接:

@Test
public void testName() throws Exception {
    List<Integer> integers = Arrays.asList(1, 2, 4, 5, 6, 7, 8, 9, 10);
    Stream<Integer> stream1 = integers.stream();
    Stream<Integer> stream2 = integers.stream();

    stream1.forEach(System.out::println);
    stream2.forEach(System.out::println);
}

版画

1 2 4 5 6 7 8 9 10

1 2 4 5 6 7 8 9 10

对于你的情况:

Stream originalStream = IntStream.range(1, 100).filter(n -> n % 2 == 0)

List<Integer> listOf = originalStream.collect(Collectors.toList())

Stream stream14 = listOf.stream().filter(n -> n % 7 == 0);
Stream stream10 = listOf.stream().filter(n -> n % 5 == 0);

对于表演等,请阅读别人的答案;)

© www.soinside.com 2019 - 2024. All rights reserved.