如何在Kotlin协同例程接收通道上实现RxJava的combineLatest?

问题描述 投票:1回答:2

我需要在.combineLatest()上实现以下ReceiveChannel扩展函数

suspend fun <A, B, R> ReceiveChannel<A>.combineLatest(
    otherSource: ReceiveChannel<B>,
    context: CoroutineContext = Unconfined,
    combineFunction: suspend (A, B) -> R
): ReceiveChannel<R> = produce(context) {
    // ?
}

我希望它能像RxJava的combineLatest()一样运行。

我怎样才能做到这一点?

编辑:到目前为止我有这个,但它没有工作。 sourceB.consumeEach{ }区块从未被激活过。

suspend fun <A, B, R> ReceiveChannel<A>.combineLatest(
    otherSource: ReceiveChannel<B>,
    context: CoroutineContext = Unconfined,
    combineFunction: suspend (A, B) -> R
): ReceiveChannel<R> = produce(context) {

    val sourceA: ReceiveChannel<A> = this@combineLatest
    val sourceB: ReceiveChannel<B> = otherSource

    var latestA: A? = null
    var latestB: B? = null

    sourceA.consumeEach { a ->
        latestA = a
        if (latestA != null && latestB != null) {
            send(combineFunction(latestA!!, latestB!!))
        }
    }

    sourceB.consumeEach { b ->
        latestB = b
        if (latestA != null && latestB != null) {
            send(combineFunction(latestA!!, latestB!!))
        }
    }
}

我还想确保当这个函数返回的ReceiveChannel<R>关闭(取消订阅)时,我想确保父通道正确关闭。

kotlin reactive-programming kotlinx.coroutines
2个回答
0
投票

这样做了!我仍然困惑为什么我可以在另一个.consumeEach{ }内嵌入一个.consumeEach { } - 它似乎不直观。

suspend fun <A, B, R> ReceiveChannel<A>.combineLatest(
    otherSource: ReceiveChannel<B>,
    context: CoroutineContext = Unconfined,
    combineFunction: suspend (A, B) -> R
): ReceiveChannel<R> = produce(context) {

    val sourceA: ReceiveChannel<A> = this@combineLatest
    val sourceB: ReceiveChannel<B> = otherSource

    val latestA = AtomicReference<A>()
    val latestB = AtomicReference<B>()

    var aInitialized = false
    var bInitialized = false

    sourceA.consumeEach { a ->
        latestA.set(a)
        aInitialized = true
        if (aInitialized && bInitialized) {
            send(combineFunction(latestA.get(), latestB.get()))
        }

        launch(coroutineContext) {
            sourceB.consumeEach { b ->
                latestB.set(b)
                bInitialized = true
                if (aInitialized && bInitialized) {
                    send(combineFunction(latestA.get(), latestB.get()))
                }
            }
        }
    }
}

0
投票

我知道这是一个老问题,但这是一个建议:

我建议使用qazxsw poi而不是嵌套qazxsw poi。查看文档.zip()

可能的解决方案.consumeEach,它生成一个Pair类型的项目。

© www.soinside.com 2019 - 2024. All rights reserved.