如何将函数应用于 rx-java Flowable 的每个元素?

问题描述 投票:0回答:1

如何将函数应用于 Flowable 中的每个元素?我试图将一个函数应用于可流动发布者中的每个项目并收集结果,但该函数永远不会被调用。

CompletionStage<Void> batchUpdate(Page<AClass> page); //Given

Completion<Void> updateItems(){
    final var futures = new ArrayList<CompletionStage<Void>>();
    final Flowable<Page<AClass>> flowable = getFlowablePublisher();
    flowable.map(this::batchUpdate).map(futures::add); //batchUpdate never gets called
    return CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
}
java rx-java
1个回答
0
投票

在您当前的实现中,batchUpdate 函数没有被调用,因为您需要触发 Flowable 的处理。 Flowable 是一种反应式流,遵循惰性评估方法。在您订阅之前,它不会开始处理数据。

要将batchUpdate函数应用到Flowable中的每个项目并收集结果,您需要订阅Flowable。您可以通过以下方式修改代码来实现此目的:

import io.reactivex.Flowable;

// ...

Completion<Void> updateItems() {
    final var futures = new ArrayList<CompletionStage<Void>>();
    final Flowable<Page<AClass>> flowable = getFlowablePublisher();

    flowable.flatMap(page -> {
        // Apply the batchUpdate function to each item and collect the results.
        CompletionStage<Void> future = batchUpdate(page);
        futures.add(future);
        return future;
    }).subscribe();

    return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

在此更新的代码中,我们使用 flatMap 运算符将batchUpdate 函数应用于 Flowable 中的每个项目,并将结果收集到 futures 列表中。 flatMap 运算符返回一个新的 Flowable,该 Flowable 发出由 batchUpdate 函数返回的 CompletionStage 的结果。

但是,您可能会注意到 Completion 的返回类型不是有效的 Java 语法。应该是完成阶段。因此,请确保在实际代码中相应地更新返回类型。

此外,请确保 getFlowablePublisher() 方法正确地向 Flowable 提供要处理的数据。

请注意,在反应式编程中,您通常使用异步操作,因此使用 CompletionStage 或类似类型很常见。请务必在实际实施中适当处理异常和错误场景。

© www.soinside.com 2019 - 2024. All rights reserved.