下面的简单程序最终挂起。
// Kotlin
package com.example
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.schedulers.Schedulers
import java.util.concurrent.TimeUnit.SECONDS
fun main() {
fun incr(n: Int): Single<Int> = Single.just(n + 1)
fun numbers(n: Int, max: Int): Flowable<Int> = Flowable.just(n).concatWith(
if (n < max)
incr(n).observeOn(Schedulers.single()).toFlowable().concatMap { next -> numbers(next, max) }
else
Flowable.empty()
)
numbers(1, 1_000_000).sample(5, SECONDS).blockingForEach(::println)
}
在我的笔记本电脑上,它通常挂在23500之后的某个位置,示例输出:
15945
21159
23802
问题实际上有两个方面:
一种潜在的解决方案,基于评论中的建议:
package com.example
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.processors.UnicastProcessor
import io.reactivex.rxjava3.processors.FlowableProcessor
import io.reactivex.rxjava3.schedulers.Schedulers
import java.util.concurrent.TimeUnit.SECONDS
fun main() {
// clients of 'unfold' are responsible for picking a suitable Processor
fun <T> FlowableProcessor<T>.unfold(seed: T, next: (T) -> T?): Flowable<T> =
toSerialized().let { proc ->
proc.onNext(seed)
proc
.subscribeOn(Schedulers.computation())
.doOnNext { prev ->
when (val curr = next(prev)) {
null ->
proc.onComplete()
else ->
proc.onNext(curr)
}
}
}
//example usage
fun numbers(first: Int, max: Int): Flowable<Int> =
UnicastProcessor.create<Int>().unfold(first) { prev -> if (prev < max) prev + 1 else null }
numbers(1, 1_000_000_000)
.sample(1, SECONDS)
.blockingForEach(::println)
}