如果满足异步条件,如何映射通量停止?

问题描述 投票:0回答:1

考虑到我有大量的整数,并且此方法模拟异步外部 api 数据检索,它可以为某些特定的未知输入返回空响应:

public static Mono<String> getApiData(int i) {
    if (i == 3) return Mono.empty(); // i'm using 3 just as an example
    return Mono.just(String.valueOf(i * 2));
}

这些方法将根据结果执行

getApiData
输出:

// when getApiData returns non empty mono
public static Mono<Boolean> updateDatabaseWithApiData(int apiInput, String apiOutput) {
    System.out.println(apiInput + " -> " + apiOutput);
    // lots of unrelated logic
    return Mono.just(true);
}

// when getApiData returns empty mono
public static Mono<Boolean> logFailure(int apiInput) {
    System.out.println(apiInput + " -> failure");
    // registering errors logs
    return Mono.just(false);
}

用这个我想编写一个像这样的方法

Mono<Boolean> processFluxUntilFailure(Flux<Integer> flux)
,它对每个元素应用
getApiData
,并且在发生故障时停止。那么如果至少有一个元素达到
updateDatabaseWithApiData
,则返回
Mono.just(true)
,否则返回
Mono.just(false)

所以我会得到这个输出:

public static void main(String[] args) {
    Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
    processFluxUntilFailure(flux).subscribe(value -> System.out.println("result " + value));
}

所需输出:

1 -> 2
2 -> 4
3 -> failure
result true

因为我们已经处理了(至少 1)2 个成功的元素。

考虑到:

  • 这是我真正问题的简化版本
  • 我无法预测数据何时会为空
    getApiData
  • 我无法改变所描述的方法,只能
    processFluxUntilFailure

我试过这个:

public static Mono<Boolean> processFluxUntilFailure(Flux<Integer> flux) {
    return flux.flatMap(apiInput -> getApiData(apiInput)
                    .flatMap(apiOutput -> updateDatabaseWithApiData(apiInput, apiOutput))
                    .switchIfEmpty(Mono.defer(() -> logFailure(apiInput)))
            )
            .reduce((b1, b2) -> b1 || b2);
}

这导致了

1 -> 2
2 -> 4
3 -> failure
4 -> 8
5 -> 10
result true

我怎样才能从这次尝试中获得我想要的输出?换句话说,如果满足某些异步条件,我如何“停止”

flatMap

java asynchronous project-reactor flatmap
1个回答
0
投票

如果有任何不那么冗长的方法,我很乐意:

public static Mono<Boolean> processAndUpdate(Flux<Integer> flux) {
    return flux

            .flatMap(apiInput -> getApiData(apiInput)
                    .flatMap(apiOutput -> updateDatabaseWithApiData(apiInput, apiOutput))
                    .switchIfEmpty(Mono.defer(() -> logFailure(apiInput)))
            ).<Boolean>handle((b, sink) -> {
                if (b) sink.next(true);
                else sink.complete();
            })

            .defaultIfEmpty(false)
            .reduce((b1, b2) -> b1 || b2);
}

给我这个输出:

1 -> 2
2 -> 4
3 -> failure
result true

显然

flatMap
handle
并没有急于评估,将它们放在一起就成功了

© www.soinside.com 2019 - 2024. All rights reserved.