Reactor GroupedFlux - 等待完成

问题描述 投票:0回答:2

拥有像下面这样的异步发布者,Project Reactor 有没有一种方法可以等待整个流处理完成?
当然,无需添加未知持续时间的睡眠...

@Test
public void groupByPublishOn() throws InterruptedException {
    UnicastProcessor<Integer> processor = UnicastProcessor.create();

    List<Integer> results = new ArrayList<>();
    Flux<Flux<Integer>> groupPublisher = processor.publish(1)
                                                  .autoConnect()
                                                  .groupBy(i -> i % 2)
                                                  .map(group -> group.publishOn(Schedulers.parallel()));

    groupPublisher.log()
                  .subscribe(g -> g.log()
                                   .subscribe(results::add));

    List<Integer> input = Arrays.asList(1, 3, 5, 2, 4, 6, 11, 12, 13);
    input.forEach(processor::onNext);
    processor.onComplete();

    Thread.sleep(500);

    Assert.assertTrue(results.size() == input.size());
}
project-reactor
2个回答
6
投票

您可以替换这些行:

 groupPublisher.log()
                  .subscribe(g -> g.log()
                                   .subscribe(results::add));

有了这个

groupPublisher.log()
              .flatMap(g -> g.log()
                             .doOnNext(results::add)
              )
              .blockLast();

flatMap
是比订阅中订阅更好的模式,并且会为您负责订阅群组。

doOnNext
负责处理消耗副作用(向集合添加值),使您无需在订阅中执行该操作。

blockLast()
取代了订阅,而不是让您为事件提供处理程序,它会阻塞直到完成(并返回最后发出的项目,但您已经在 doOnNext 中处理了这一点)。


2
投票

使用blockLast()的主要问题是,如果您的操作无法完成,您将永远不会释放管道。

您需要做的是获取 Disposable 并检查是否已完成管道,这意味着布尔值 isDispose 它将返回 true。

然后由你决定是否想要超时,就像惰性计数实现:)

@Test
public void checkIfItDisposable() throws InterruptedException {
  Disposable subscribe = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
      .map(number -> {
        try {
          Thread.sleep(500);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        return number;
      }).subscribeOn(Schedulers.newElastic("1"))
      .subscribe();
  while (!subscribe.isDisposed() && count < 100) {
    Thread.sleep(400);
    count++;
    System.out.println("Waiting......");
  }
  System.out.println("It disposable:" + subscribe.isDisposed());
}

如果你想使用blockLast,至少添加一个超时

@Test
public void checkIfItDisposableBlocking() throws InterruptedException {
  Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
      .map(number -> {
        try {
          Thread.sleep(500);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        return number;
      }).subscribeOn(Schedulers.newElastic("1"))
      .blockLast(Duration.of(60, ChronoUnit.SECONDS));
  System.out.println("It disposable");
}

如果您需要更多IDE,可以在这里查看更多Reactor示例https://github.com/politrons/reactive

© www.soinside.com 2019 - 2024. All rights reserved.