背景
我正在使用Spring Boot 2.2.1,project-reactor 3.3.0和spring-data-mongodb 2.2.1,并且我试图从多个查询中加载数据。我的代码大致如下:
Flux.just("type1", "type2", "type3", "type4")
.concatMap { type ->
reactiveMongoOperations.find<Map<String, Any>>(BasicQuery("{'type': '$type'}"), "collectionName")
.doOnError { e ->
log.error("Caught exception when reading from mongodb: ${e::class.simpleName} - ${e.message}", e)
}.switchIfEmpty {
log.warn("Failed to find any documents of type $type")
Mono.empty<Map<String, Any>>()
}
}
.. // More operations here
.subscribe()
问题是,如果reactiveMongoOperations.find(..)
找不到给定类型的任何文档(因此记录了"Failed to find any documents of type $type"
,则整个操作将无限期地挂起。如果删除switchIfEmpty
子句,则操作完成,一切正常。
问题
switchIfEmpty
操作后整个操作为什么会挂起?我使用flatMap
而不是concatMap
都没关系,它最终还是会挂起。reactiveMongoOperations.find(..)
返回空的Flux
时找不到任何文档。当从Kotlin将代码重写为Java时(正如Thomas在评论中所建议的那样),我找到了答案!我以为我使用了reactor.kotlin.core.publisher.switchIfEmpty
库提供的Kotlin reactor-kotlin-extensions
扩展功能:
fun <T> Flux<T>.switchIfEmpty(s: () -> Publisher<T>): Flux<T> = this.switchIfEmpty(Flux.defer { s() })
这里不是这种情况,因此我最终使用了switchIfEmpty
中定义的Flux
方法,如下所示:
public final Flux<T> switchIfEmpty(Publisher<? extends T> alternate)
要使其在没有扩展功能的情况下工作,我可能应该做这样的事情:
..
.switchIfEmpty { subscriber ->
log.warn("Failed to find any documents of type $type")
subscriber.onComplete()
}
我最初的解决方案没有用,因为Java版本假定我create一个Publisher
(我做了),还invoke这个发布者上的一个函数(我没有)。在Kotlin中,lambda参数是可选的,如果不需要的话,这就是类型系统没有捕获到该参数的原因。
这是Kotlin与Java互操作的技巧之一。