如何使用在核心库中出售的运算符来递归地生成RxJava Flowable?

问题描述 投票:0回答:1

下面的简单程序最终挂起。

// Kotlin
package com.example

import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.schedulers.Schedulers
import java.util.concurrent.TimeUnit.SECONDS

fun main() {
    fun incr(n: Int): Single<Int> = Single.just(n + 1)

    fun numbers(n: Int, max: Int): Flowable<Int> = Flowable.just(n).concatWith(
        if (n < max)
            incr(n).observeOn(Schedulers.single()).toFlowable().concatMap { next -> numbers(next, max) }
        else
            Flowable.empty()
    )

    numbers(1, 1_000_000).sample(5, SECONDS).blockingForEach(::println)
}

在我的笔记本电脑上,它通常挂在23500之后的某个位置,示例输出:

15945
21159
23802

问题实际上有两个方面:

  • 是否有可能使用核心库中出售的运算符以堆栈安全,非挂起的方式递归生成RxJava Flowable?
  • 如果是,实现该目标的技术是什么?
rx-java
1个回答
0
投票

一种潜在的解决方案,基于评论中的建议:

package com.example

import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.processors.UnicastProcessor
import io.reactivex.rxjava3.processors.FlowableProcessor
import io.reactivex.rxjava3.schedulers.Schedulers
import java.util.concurrent.TimeUnit.SECONDS

fun main() {
    // clients of 'unfold' are responsible for picking a suitable Processor
    fun <T> FlowableProcessor<T>.unfold(seed: T, next: (T) -> T?): Flowable<T> =
        toSerialized().let { proc ->
            proc.onNext(seed)

            proc
                .subscribeOn(Schedulers.computation())
                .doOnNext { prev ->
                    when (val curr = next(prev)) {
                        null ->
                            proc.onComplete()
                        else ->
                            proc.onNext(curr)
                    }
                }
        }


    //example usage
    fun numbers(first: Int, max: Int): Flowable<Int> =
        UnicastProcessor.create<Int>().unfold(first) { prev -> if (prev < max) prev + 1 else null }

    numbers(1, 1_000_000_000)
        .sample(1, SECONDS)
        .blockingForEach(::println)
}
© www.soinside.com 2019 - 2024. All rights reserved.