不明白如何在kotlin中使用flux订阅

问题描述 投票:0回答:2

我是反应式编程的新手。我希望看到

test provider started
Beat 1000
Beat 2000

在日志,但只有test provider started,没有Beaton complete消息。看起来我想念一些东西

@Service
class ProviderService {

    @PostConstruct
    fun start(){
        val hb: Flux<HeartBeat> = Flux.interval(Duration.ofSeconds(1)).map { HeartBeat(it) }
        val provider = Provider("test", hb)
    }

}
////////////////////////

open class Provider(name: String, heartBests: Flux<HeartBeat>) {
    companion object {
        val log = LoggerFactory.getLogger(Provider::class.java)!!
    }

    init {
        log.info("$name provider started")
        heartBests.doOnComplete { log.info("on complete") }
        heartBests.doOnEach { onBeat(it.get().number) }
    }

    fun onBeat(n: Number){
        log.info("Beat $n")
    }
}

/////
class HeartBeat(val number: Number)
spring kotlin reactive-programming project-reactor
2个回答
2
投票

在你的代码中,从未调用过'doOnComplete'的lambda,因为你创建了无限的流。方法'doOnEach'作为'map'是中间操作(如流中的map),它不进行调用。你有另一个错误,被动反应表明“流畅的模式”。

试试这个简单的例子:

import reactor.core.publisher.Flux
import java.time.Duration

fun main(args: Array<String>) {
    val flux = Flux.interval(Duration.ofSeconds(1)).map { HeartBeat(it) }

    println("start")

    flux.take(3)
            .doOnEach { println("on each $it") }
            .map { println("before map");HeartBeat(it.value * 2) }
            .doOnNext { println("on next $it") }
            .doOnComplete { println("on complete") }
            .subscribe { println("subscribe $it") }

    Thread.sleep(5000)
}

data class HeartBeat(val value: Long)

2
投票

这里有三个相当常见的错误:

  • doOnEach这样的运算符会返回一个带有添加行为的新Flux实例,因此您需要(重新)分配给变量或使用流畅的样式
  • 没有任何事情发生,直到你subscribe()(或它的变体.blockXXX也在引擎盖下subscribe ...)
  • 这样的管道是完全异步的,并且由于源Thread的时间维度而在单独的interval上运行。因此,即使您已订阅,控制也会立即返回init,可能导致主线程然后应用程序退出。
© www.soinside.com 2019 - 2024. All rights reserved.