我是反应式编程的新手,我正在通过 micronaut 框架和 kotlin 使用反应堆。我试图了解反应式编程的优势以及我们如何通过 Mono 和 Flux 使用 Map 和 FlatMap 来实现它。
我了解响应式编程的非阻塞方面,但如果对数据流的操作实际上是异步的,我会感到困惑。
我一直在阅读 FlatMap 并了解它们异步生成内部流,然后将这些流合并到另一个 Flux 而不维护顺序。我看过的许多图表都使它更容易理解,但当涉及到实际用例时,我有一些基本问题。
例子:
fun updateDetials() {
itemDetailsCrudRepository.getItems()
.flatMap {
customerRepository.save(someTransferObject.toEntity(it))
}
}
在上面的示例中,假设
itemDetailsCrudRepository.getItems()
返回特定实体的 Flux。 flatMap 操作必须将通量中的每个项目保存到另一个表中。 customerRepository.save()
将从 flux 中保存项目,我们通过数据类的实例获得所需的实体 someTransferObject
.
现在,假设 getItems() 查询返回了 10 个项目,我们需要在新表中保存 10 行。 flatMap 操作(将这些项目保存到新表中的操作)是一次(同步)应用于通量的每一项,还是所有的保存都是异步发生的?
我读到的一件事是,如果
subscribeOn(Scheduler.parallel())
是 not 应用,那么 flatMap 操作将一次应用于通量中的每个项目(同步)。这个信息对吗?
如果我的基础知识本身不正确,请纠正我。
我觉得flatMap的异步是给next operator的,导致它不符合next operator的顺序。 对于你的例子,如果你想要异步保存(),你可以使用
.parallel(10)
.runOn(Schedulers.parallel())
之前 flatMap 和
.sequential()
在 flatMap 之后。