有条件地取消()通量并保留导致取消的值

问题描述 投票:0回答:1

我有以下 Java 中的 Reactive Flux 流,

Flux.just(e1, e2, e3, e4, e5)
   .flatMapSequentially(/* process elements */)
   .scan(new Accumulator(null, 0), (element, total) -> new Accumulator(element, element.children.size + total))
   .takeWhile(/* conditionally process till for example e3, need accumulator value here */)
   .map(/* post processing */)

record Accumulator(Element element, int total) {}

takeWhile
中,取消有条件地发生在元素
e3
处,一旦取消,该值将在后续
.map()
中不可用,并且流仅返回
e1, e2
,但我还需要
e3
元素以及
e1, e2
。 如何有条件地保留 e3?

java rxjs spring-webflux reactive-programming project-reactor
1个回答
0
投票

您可以使用 takeUntil 代替:

这包括匹配数据(与 takeWhile(java.util.function.Predicate)不同)。

示例:

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

public class TakeUntilTest {

    @Test
    public void takeUntilIncludesStopElement() {
        var datasStream = Flux.range(1, 10)
                // Stop when we encounter value 3
                .takeUntil(i -> i.equals(3));
        StepVerifier.create(datasStream)
                // verify resulting flow include stop value
                .expectNext(1, 2, 3)
                .verifyComplete();
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.