我需要在.combineLatest()
上实现以下ReceiveChannel
扩展函数
suspend fun <A, B, R> ReceiveChannel<A>.combineLatest(
otherSource: ReceiveChannel<B>,
context: CoroutineContext = Unconfined,
combineFunction: suspend (A, B) -> R
): ReceiveChannel<R> = produce(context) {
// ?
}
我希望它能像RxJava的combineLatest()
一样运行。
我怎样才能做到这一点?
编辑:到目前为止我有这个,但它没有工作。 sourceB.consumeEach{ }
区块从未被激活过。
suspend fun <A, B, R> ReceiveChannel<A>.combineLatest(
otherSource: ReceiveChannel<B>,
context: CoroutineContext = Unconfined,
combineFunction: suspend (A, B) -> R
): ReceiveChannel<R> = produce(context) {
val sourceA: ReceiveChannel<A> = this@combineLatest
val sourceB: ReceiveChannel<B> = otherSource
var latestA: A? = null
var latestB: B? = null
sourceA.consumeEach { a ->
latestA = a
if (latestA != null && latestB != null) {
send(combineFunction(latestA!!, latestB!!))
}
}
sourceB.consumeEach { b ->
latestB = b
if (latestA != null && latestB != null) {
send(combineFunction(latestA!!, latestB!!))
}
}
}
我还想确保当这个函数返回的ReceiveChannel<R>
关闭(取消订阅)时,我想确保父通道正确关闭。
这样做了!我仍然困惑为什么我可以在另一个.consumeEach{ }
内嵌入一个.consumeEach { }
- 它似乎不直观。
suspend fun <A, B, R> ReceiveChannel<A>.combineLatest(
otherSource: ReceiveChannel<B>,
context: CoroutineContext = Unconfined,
combineFunction: suspend (A, B) -> R
): ReceiveChannel<R> = produce(context) {
val sourceA: ReceiveChannel<A> = this@combineLatest
val sourceB: ReceiveChannel<B> = otherSource
val latestA = AtomicReference<A>()
val latestB = AtomicReference<B>()
var aInitialized = false
var bInitialized = false
sourceA.consumeEach { a ->
latestA.set(a)
aInitialized = true
if (aInitialized && bInitialized) {
send(combineFunction(latestA.get(), latestB.get()))
}
launch(coroutineContext) {
sourceB.consumeEach { b ->
latestB.set(b)
bInitialized = true
if (aInitialized && bInitialized) {
send(combineFunction(latestA.get(), latestB.get()))
}
}
}
}
}
我知道这是一个老问题,但这是一个建议:
我建议使用qazxsw poi而不是嵌套qazxsw poi。查看文档.zip()
。
可能的解决方案.consumeEach
,它生成一个Pair类型的项目。