我试图理解流在 Kotlin 中的使用,我尝试了不同的代码,特别是我使用了这种方法:
val stateFlow = MutableStateFlow(0)
private fun stateFlowWithContinueCollect() {
runBlocking {
launch {
repeat(10) {
delay(1000)
stateFlow.value = stateFlow.value + 1
}
}
stateFlow.collect { value ->
println("Actual value is $value")
if (value == 6) {
continueInSameFlow()
}
}
}
}
suspend fun continueInSameFlow() {
stateFlow.collect {
println("Im in the second collect, value is $it")
}
}
我知道当我在第一次收集时,它会一直接收值,直到它进入第二次收集时值等于 6,因此它是一个暂停函数,将一直保持直到完成。
有没有办法让两个 collect 同时从同一个 StateFlow 中获取数据,或者这没有意义吗?
我试过将一个收集放在另一个收集下,但除了 Android Studio 警告我它没有意义外,我看到它只执行一个收集而不执行第二个收集。
将对
continueInSameFlow()
的调用替换为 launch { continueInSameFlow() }
,这样您就可以在另一个协程中异步运行第二个收集器。收集会暂停协程,直到流程完成,但 StateFlows 永远不会完成,因此您导致第一个收集挂起。
调用
flow.collect()
,暂停您的协程以等待流程完成发射它会发射的所有项目。 StateFlow 是无限的(SharedFlow 和许多冷流也是如此。)
因此,如果你这样做:
scope.launch {
someInfiniteFlow.collect {
println(it)
}
println("Hello") // unreachable code
}
那么
"Hello"
将永远不会被打印,因为 collect
永远不会返回。它会永远挂起,总是等待下一个项目被发出,然后为每个项目再次运行 lambda。
当然,如果您用另一个对方付费电话代替
println()
,那也永远无法接通。是相同的流还是不同的流都没有关系。无论你在下面做什么,都是无法访问的代码。
scope.launch {
someInfiniteFlow.collect {
println(it)
}
anyFlow.collect { // unreachable code
println(it)
}
}
在您的示例代码中,同样的原则适用。当您开始在外部
collect
lambda 中收集无限流时,您正在调用一个将永远挂起的函数。因此,你的 lambda 的那个迭代永远不会返回,所以它永远不会用另一个要处理的项目再次调用。
因此,如果你想并行收集流(无论它们是否是同一个流),你必须在单独的协程中进行:
scope.launch {
someInfiniteFlow.collect {
println(it)
}
}
scope.launch {
anyFlow.collect {
println(it)
}
}
出于这个原因,收集流的协程除了收集该流之外不做任何事情是很常见的。它是如此常见,以至于 Flows 有一个
launchIn
运算符来帮助使代码更易于阅读(减少嵌套缩进)。这相当于上面的:
someInfiniteFlow.onEach {
println(it)
}.launchIn(scope)
anyFlow.onEach {
println(it)
}.launchIn(scope)
以下是我将如何重写您问题中的代码,以便第一个集合不会被发出“6”后开始的第二个集合打断:
val stateFlow = MutableStateFlow(0)
private fun stateFlowWithContinueCollect() {
runBlocking {
launch {
repeat(10) {
delay(1000)
stateFlow.value = stateFlow.value + 1
}
}
stateFlow.onEach { value ->
println("Actual value is $value")
}.launchIn(this)
stateFlow
.dropWhile { it != 6 }
.onEach {
println("Im in the second collect, value is $it")
}.launchIn(this)
}
}
launchIn
调用正在启动 runBlocking
的 CoroutineScope 的子协程,而不是直接在 runBlocking
协程中同步收集流。
如果你想按照自己的方式去做,你可以。在您的代码中,将对
continueInSameFlow()
的调用包装在 launch { }
中,以便它被异步收集到第一个收集器。
请注意,无论采用哪种方式,因为您正在收集无限流,所以您的外部函数
stateFlowWithContinueCollect()
永远不会返回。