我正在尝试实现一个在其实现中使用其自身的另一个实例的流。该流前面有一些常量元素(使用 IntStream.concat),因此只要串联流惰性地创建非常量部分,这应该就可以工作。我认为使用 StreamSupport.intStream 重载采用带有 IntStream.concat 的Supplier(其中“创建一个延迟连接的流”)应该足够懒,只在需要元素时创建第二个分割器,但甚至创建流(不评估它)溢出堆栈。如何延迟连接流?
我正在尝试将流式素数筛从这个答案移植到Java中。该筛子使用其自身的另一个实例(Python 代码中的
ps = postponed_sieve()
)。如果我将最初的四个常量元素 (yield 2; yield 3; yield 5; yield 7;
) 分解到它们自己的流中,则很容易将生成器实现为分割器:
/**
* based on https://stackoverflow.com/a/10733621/3614835
*/
static class PrimeSpliterator extends Spliterators.AbstractIntSpliterator {
private static final int CHARACTERISTICS = Spliterator.DISTINCT | Spliterator.IMMUTABLE | Spliterator.NONNULL | Spliterator.ORDERED | Spliterator.SORTED;
private final Map<Integer, Supplier<IntStream>> sieve = new HashMap<>();
private final PrimitiveIterator.OfInt postponedSieve = primes().iterator();
private int p, q, c = 9;
private Supplier<IntStream> s;
PrimeSpliterator() {
super(105097564 /* according to Wolfram Alpha */ - 4 /* in prefix */,
CHARACTERISTICS);
//p = next(ps) and next(ps) (that's Pythonic?)
postponedSieve.nextInt();
this.p = postponedSieve.nextInt();
this.q = p*p;
}
@Override
public boolean tryAdvance(IntConsumer action) {
for (; c > 0 /* overflow */; c += 2) {
Supplier<IntStream> maybeS = sieve.remove(c);
if (maybeS != null)
s = maybeS;
else if (c < q) {
action.accept(c);
return true; //continue
} else {
s = () -> IntStream.iterate(q+2*p, x -> x + 2*p);
p = postponedSieve.nextInt();
q = p*p;
}
int m = s.get().filter(x -> !sieve.containsKey(x)).findFirst().getAsInt();
sieve.put(m, s);
}
return false;
}
}
我第一次尝试 primes() 方法返回一个 IntStream,将常量流与新的 PrimeSpliterator 连接起来:
public static IntStream primes() {
return IntStream.concat(IntStream.of(2, 3, 5, 7),
StreamSupport.intStream(new PrimeSpliterator()));
}
调用 primes() 会导致 StackOverflowError,因为 primes() 始终实例化 PrimeSpliterator,但 PrimeSpliterator 的字段初始值设定项始终调用 primes()。然而,StreamSupport.intStream 的重载需要一个供应商,这应该允许延迟创建 PrimeSpliterator:
public static IntStream primes() {
return IntStream.concat(IntStream.of(2, 3, 5, 7),
StreamSupport.intStream(PrimeSpliterator::new, PrimeSpliterator.CHARACTERISTICS, false));
}
然而,我却得到了一个具有不同回溯的 StackOverflowError(在重复时进行了修剪)。请注意,递归完全在对 primes() 的调用中——终端操作 iterator() 永远不会在返回的流上调用。
Exception in thread "main" java.lang.StackOverflowError
at java.util.stream.StreamSpliterators$DelegatingSpliterator$OfInt.<init>(StreamSpliterators.java:582)
at java.util.stream.IntPipeline.lazySpliterator(IntPipeline.java:155)
at java.util.stream.IntPipeline$Head.lazySpliterator(IntPipeline.java:514)
at java.util.stream.AbstractPipeline.spliterator(AbstractPipeline.java:352)
at java.util.stream.IntPipeline.spliterator(IntPipeline.java:181)
at java.util.stream.IntStream.concat(IntStream.java:851)
at com.jeffreybosboom.projecteuler.util.Primes.primes(Primes.java:22)
at com.jeffreybosboom.projecteuler.util.Primes$PrimeSpliterator.<init>(Primes.java:32)
at com.jeffreybosboom.projecteuler.util.Primes$$Lambda$1/834600351.get(Unknown Source)
at java.util.stream.StreamSpliterators$DelegatingSpliterator.get(StreamSpliterators.java:513)
at java.util.stream.StreamSpliterators$DelegatingSpliterator.estimateSize(StreamSpliterators.java:536)
at java.util.stream.Streams$ConcatSpliterator.<init>(Streams.java:713)
at java.util.stream.Streams$ConcatSpliterator$OfPrimitive.<init>(Streams.java:789)
at java.util.stream.Streams$ConcatSpliterator$OfPrimitive.<init>(Streams.java:785)
at java.util.stream.Streams$ConcatSpliterator$OfInt.<init>(Streams.java:819)
at java.util.stream.IntStream.concat(IntStream.java:851)
at com.jeffreybosboom.projecteuler.util.Primes.primes(Primes.java:22)
at com.jeffreybosboom.projecteuler.util.Primes$PrimeSpliterator.<init>(Primes.java:32)
at com.jeffreybosboom.projecteuler.util.Primes$$Lambda$1/834600351.get(Unknown Source)
at java.util.stream.StreamSpliterators$DelegatingSpliterator.get(StreamSpliterators.java:513)
at java.util.stream.StreamSpliterators$DelegatingSpliterator.estimateSize(StreamSpliterators.java:536)
at java.util.stream.Streams$ConcatSpliterator.<init>(Streams.java:713)
at java.util.stream.Streams$ConcatSpliterator$OfPrimitive.<init>(Streams.java:789)
at java.util.stream.Streams$ConcatSpliterator$OfPrimitive.<init>(Streams.java:785)
at java.util.stream.Streams$ConcatSpliterator$OfInt.<init>(Streams.java:819)
at java.util.stream.IntStream.concat(IntStream.java:851)
at com.jeffreybosboom.projecteuler.util.Primes.primes(Primes.java:22)
如何足够懒惰地连接流以允许流在其实现中使用自身的另一个副本?
您显然假设 Streams API 将其惰性保证扩展到了 spliterator 的实例化;这是不正确的。它期望能够在实际消费开始之前随时实例化流的分割器,例如只是为了找出流的特征和报告的大小。仅通过调用
trySplit
、tryAdvance
或 forEachRemaining
开始消费。
考虑到这一点,您可以在需要之前初始化推迟的筛选。在
else if
中的 tryAdvance
部分之前,您无法使用其任何结果。因此,将代码移至最后可能的时刻,以保证正确性:
@Override
public boolean tryAdvance(IntConsumer action) {
for (; c > 0 /* overflow */; c += 2) {
Supplier<IntStream> maybeS = sieve.remove(c);
if (maybeS != null)
s = maybeS;
else {
if (postponedSieve == null) {
postponedSieve = primes().iterator();
postponedSieve.nextInt();
this.p = postponedSieve.nextInt();
this.q = p*p;
}
if (c < q) {
action.accept(c);
return true; //continue
我认为,通过这一更改,即使您第一次尝试
primes()
也应该会起作用。
如果您想继续使用当前的方法,您可以使用以下习语:
Stream.<Supplier<IntStream>>of(
()->IntStream.of(2, 3, 5, 7),
()->intStream(new PrimeSpliterator()))
.flatMap(Supplier::get);
您可能会发现这给了您足够的懒惰。
我喜欢用
Supplier
来做到这一点:
return Stream.<Supplier<Stream<WhatEver>>of(
() -> generateStreamOfWhatEverAndChangeSomeState(input, state),
() -> generateStreamOfMoreWhatEversDependendingOnMutatedState(state)
).flatMap(Supplier::get);
由于流是延迟评估的,
generateStreamOfWhatEverAndChangeSomeState()
将在generateStreamOfMoreWhatEversDependendingOnMutatedState()
开始之前完成,并且state
将被更新。
我应该指出,这可能不是
Stream
的设计师的初衷。理想情况下,Stream
不应该改变状态,只读取每个项目并生成一个新项目。
@selalerercapitolis 的答案对我的用例非常有帮助,这与突变无关:从连接流中获取元素,直到其中一个元素不具有特定属性。
示例:
Stream<String> concatenated = Stream.of(supplierA, supplierB).flatMap(Supplier::get); // selalerercapitolis's answer.
List<String> result = concatenated.takeWhile(s -> s.length() > 5).toList();
supplierB
生成一个创建起来有点昂贵的流,因此如果 supplierA
生成的元素之一不符合 takeWhile()
测试的条件,我会避免创建它。也就是说,如果 result
将仅包含由 supplierA
生成的元素。