Flux.doOnEach 和 Flux.doOnNext 与 Flux.publishOn 的不同行为

问题描述 投票:0回答:0

我试图理解使用 doOnEach 运算符和空 Flux 进行发布的后果。我有一个简单的测试:

  private static void incrementOnNext(AtomicInteger value, Signal<String> signal) {
    if (signal.isOnNext()) {
      value.incrementAndGet();
    }
  }

  @Test
  void incrementTest() {
    AtomicInteger value = new AtomicInteger(0);
    StepVerifier.create(
            Flux.just("Test text")
                .doOnEach(signal -> incrementOnNext(value, signal))
                .publishOn(Schedulers.boundedElastic())
                .doOnEach(signal -> incrementOnNext(value, signal))
                .doOnEach(signal -> incrementOnNext(value, signal))
                .filter(String::isEmpty))
        .verifyComplete();

    assertThat(value.get()).isEqualTo(3);
  }

我希望这个测试能够通过。但是, doOnEach 只执行一次,我不明白为什么。更重要的是 - 如果我用 doOnNext 更改 doOnEach 测试将按预期工作:

  @Test
  void incrementTestOnNext() {
    AtomicInteger value = new AtomicInteger(0);

    StepVerifier.create(
            Flux.just("Test text")
                .doOnNext(ign -> value.incrementAndGet())
                .publishOn(Schedulers.boundedElastic())
                .doOnNext(ign -> value.incrementAndGet())
                .doOnNext(ign -> value.incrementAndGet())
                .filter(String::isEmpty))
        .verifyComplete();

    assertThat(value.get()).isEqualTo(3);
  }

doOnEach 和 doOnNext 有什么区别?

spring-webflux project-reactor
© www.soinside.com 2019 - 2024. All rights reserved.