我有以下 Java 中的 Reactive Flux 流,
Flux.just(e1, e2, e3, e4, e5)
.flatMapSequentially(/* process elements */)
.scan(new Accumulator(null, 0), (element, total) -> new Accumulator(element, element.children.size + total))
.takeWhile(/* conditionally process till for example e3, need accumulator value here */)
.map(/* post processing */)
record Accumulator(Element element, int total) {}
在
takeWhile
中,取消有条件地发生在元素 e3
处,一旦取消,该值将在后续 .map()
中不可用,并且流仅返回 e1, e2
,但我还需要 e3
元素以及e1, e2
。
如何有条件地保留 e3?
您可以使用 takeUntil 代替:
这包括匹配数据(与 takeWhile(java.util.function.Predicate)不同)。
示例:
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
public class TakeUntilTest {
@Test
public void takeUntilIncludesStopElement() {
var datasStream = Flux.range(1, 10)
// Stop when we encounter value 3
.takeUntil(i -> i.equals(3));
StepVerifier.create(datasStream)
// verify resulting flow include stop value
.expectNext(1, 2, 3)
.verifyComplete();
}
}