我试图理解使用 doOnEach 运算符和空 Flux 进行发布的后果。我有一个简单的测试:
private static void incrementOnNext(AtomicInteger value, Signal<String> signal) {
if (signal.isOnNext()) {
value.incrementAndGet();
}
}
@Test
void incrementTest() {
AtomicInteger value = new AtomicInteger(0);
StepVerifier.create(
Flux.just("Test text")
.doOnEach(signal -> incrementOnNext(value, signal))
.publishOn(Schedulers.boundedElastic())
.doOnEach(signal -> incrementOnNext(value, signal))
.doOnEach(signal -> incrementOnNext(value, signal))
.filter(String::isEmpty))
.verifyComplete();
assertThat(value.get()).isEqualTo(3);
}
我希望这个测试能够通过。但是, doOnEach 只执行一次,我不明白为什么。更重要的是 - 如果我用 doOnNext 更改 doOnEach 测试将按预期工作:
@Test
void incrementTestOnNext() {
AtomicInteger value = new AtomicInteger(0);
StepVerifier.create(
Flux.just("Test text")
.doOnNext(ign -> value.incrementAndGet())
.publishOn(Schedulers.boundedElastic())
.doOnNext(ign -> value.incrementAndGet())
.doOnNext(ign -> value.incrementAndGet())
.filter(String::isEmpty))
.verifyComplete();
assertThat(value.get()).isEqualTo(3);
}
doOnEach 和 doOnNext 有什么区别?