为什么使用switchIfEmpty时项目反应堆会无限期挂起?

问题描述 投票:1回答:1

背景

我正在使用Spring Boot 2.2.1,project-reactor 3.3.0和spring-data-mongodb 2.2.1,并且我试图从多个查询中加载数据。我的代码大致如下:

Flux.just("type1", "type2", "type3", "type4")
    .concatMap { type ->
        reactiveMongoOperations.find<Map<String, Any>>(BasicQuery("{'type': '$type'}"), "collectionName")
                                .doOnError { e ->
                                    log.error("Caught exception when reading from mongodb: ${e::class.simpleName} - ${e.message}", e)
                                }.switchIfEmpty {
                                    log.warn("Failed to find any documents of type $type")
                                    Mono.empty<Map<String, Any>>()
                                }
    } 
    .. // More operations here
    .subscribe()

问题是,如果reactiveMongoOperations.find(..)找不到给定类型的任何文档(因此记录了"Failed to find any documents of type $type",则整个操作将无限期地挂起。如果删除switchIfEmpty子句,则操作完成,一切正常。

问题

  1. 为什么添加switchIfEmpty操作后整个操作为什么会挂起?我使用flatMap而不是concatMap都没关系,它最终还是会挂起。
  2. 我应如何记录该特定查询未找到任何文档?即我想记录一下,当reactiveMongoOperations.find(..)返回空的Flux时找不到任何文档。
spring-boot reactive-programming spring-data-mongodb project-reactor
1个回答
1
投票

当从Kotlin将代码重写为Java时(正如Thomas在评论中所建议的那样),我找到了答案!我以为我使用了reactor.kotlin.core.publisher.switchIfEmpty库提供的Kotlin reactor-kotlin-extensions扩展功能:

fun <T> Flux<T>.switchIfEmpty(s: () -> Publisher<T>): Flux<T> = this.switchIfEmpty(Flux.defer { s() })

这里不是这种情况,因此我最终使用了switchIfEmpty中定义的Flux方法,如下所示:

public final Flux<T> switchIfEmpty(Publisher<? extends T> alternate)

要使其在没有扩展功能的情况下工作,我可能应该做这样的事情:

.. 
.switchIfEmpty { subscriber ->
    log.warn("Failed to find any documents of type $type")
    subscriber.onComplete()
}

我最初的解决方案没有用,因为Java版本假定我create一个Publisher(我做了),还invoke这个发布者上的一个函数(我没有)。在Kotlin中,lambda参数是可选的,如果不需要的话,这就是类型系统没有捕获到该参数的原因。

这是Kotlin与Java互操作的技巧之一。

© www.soinside.com 2019 - 2024. All rights reserved.