Reactor的FlatMap是异步的吗

问题描述 投票:0回答:1

我是反应式编程的新手,我正在通过 micronaut 框架和 kotlin 使用反应堆。我试图了解反应式编程的优势以及我们如何通过 MonoFlux 使用 MapFlatMap 来实现它。

我了解响应式编程的非阻塞方面,但如果对数据流的操作实际上是异步的,我会感到困惑。

我一直在阅读 FlatMap 并了解它们异步生成内部流,然后将这些流合并到另一个 Flux 而不维护顺序。我看过的许多图表都使它更容易理解,但当涉及到实际用例时,我有一些基本问题。

例子:

fun updateDetials() {
        itemDetailsCrudRepository.getItems()
            .flatMap { 
                customerRepository.save(someTransferObject.toEntity(it))
            }
    }

在上面的示例中,假设

itemDetailsCrudRepository.getItems()
返回特定实体的 Flux。 flatMap 操作必须将通量中的每个项目保存到另一个表中。
customerRepository.save()
将从 flux 中保存项目,我们通过数据类的实例获得所需的实体
someTransferObject
.

现在,假设 getItems() 查询返回了 10 个项目,我们需要在新表中保存 10 行。 flatMap 操作(将这些项目保存到新表中的操作)是一次(同步)应用于通量的每一项,还是所有的保存都是异步发生的?

我读到的一件事是,如果

subscribeOn(Scheduler.parallel())
not 应用,那么 flatMap 操作将一次应用于通量中的每个项目(同步)。这个信息对吗?

如果我的基础知识本身不正确,请纠正我。

asynchronous reactive-programming project-reactor micronaut flatmap
1个回答
0
投票

我觉得flatMap的异步是给next operator的,导致它不符合next operator的顺序。 对于你的例子,如果你想要异步保存(),你可以使用

            .parallel(10)
            .runOn(Schedulers.parallel())

之前 flatMap 和

            .sequential()

在 flatMap 之后。

© www.soinside.com 2019 - 2024. All rights reserved.