拆分和合并时Flux.fromStream失败

问题描述 投票:1回答:2

我有这个示例代码:

Flux<Integer> range = Flux.range(0, 10);
Flux<Long> longs = Flux.fromStream(new Random().longs(100, 500).boxed()); // (1)
// Flux<Long> longs = Flux.fromIterable(new Random().longs(100, 500).boxed().limit(30).collect(Collectors.toList())); // (2)

Flux<Tuple2<Integer, Long>> flux1 = Flux.zip(range, longs);

Flux<Integer> flux2 = flux1.map(e -> 2);
Flux<Integer> flux3 = flux1.map(e -> 3);

CountDownLatch countDownLatch = new CountDownLatch(1);

Flux.merge(flux2, flux3)
   .doOnComplete(() -> countDownLatch.countDown())
   .subscribe(e -> log.info("{}", e));

countDownLatch.await(1, TimeUnit.MINUTES);

这失败了:

Caused by: java.lang.IllegalStateException: stream has already been operated upon or closed
  at java.util.stream.AbstractPipeline.spliterator(AbstractPipeline.java:343)
  at java.util.stream.ReferencePipeline.iterator(ReferencePipeline.java:139)
  at reactor.core.publisher.FluxStream.subscribe(FluxStream.java:57)
  at reactor.core.publisher.Flux.subscribe(Flux.java:7777)
  at reactor.core.publisher.FluxZip$ZipCoordinator.subscribe(FluxZip.java:579)
  ...

注释行(1)和取消注释行(2)解决了这个问题,但在我的用例中,longs是无限的,如(1)所示。我该如何解决这个问题?

真正的用例是在flux2flux3完成时做一些事情,它们在map调用中有副作用 - 在这种情况下写入文件,所以我需要确保在退出之前写入所有内容。

java project-reactor
2个回答
0
投票

Flux的设计是可重复使用的,但是在第(2)行中你使用Flux.fromStream消耗Java Streams(只能使用一次) - 这就是你得到stream has already been operated upon or closed的原因。

其中一个解决方案是用随机的相同种子复制longs流。

long seed = 1000000;

Flux<Long> longs = Flux.fromStream(new Random(seed).longs(100, 500).boxed());      
Flux<Long> longs1 = Flux.fromStream(new Random(seed).longs(100, 500).boxed()); 

Flux<Integer> flux2 = Flux.zip(range, longs).map(e -> 2);
Flux<Integer> flux3 = Flux.zip(range, longs1).map(e -> 3);

0
投票

你可以使用defer

Flux<Long> longs = Flux.defer(() -> Flux.fromStream(new Random().longs(100, 500).boxed()));

即使您只有一个明确的subscribe,您也可以根据longs创建多个流。为每个订阅创建订阅。

Flux.fromStream只能订阅一次因为Java流只能被消费一次。

defer通过为每个订户创建新流来解决此问题。

© www.soinside.com 2019 - 2024. All rights reserved.