同一个StateFlow可以使用两个collect吗?

问题描述 投票:0回答:1

我试图理解流在 Kotlin 中的使用,我尝试了不同的代码,特别是我使用了这种方法:

val stateFlow = MutableStateFlow(0) 

private fun stateFlowWithContinueCollect() {
    runBlocking {

        launch {
            repeat(10) {
                delay(1000)
                stateFlow.value = stateFlow.value + 1
            }
        }

        stateFlow.collect { value ->
            println("Actual value is $value")
            if (value == 6) {
                continueInSameFlow()
            }
        }
    }
}

suspend fun continueInSameFlow() {
    stateFlow.collect {
        println("Im in the second collect, value is $it")
    }
}

我知道当我在第一次收集时,它会一直接收值,直到它进入第二次收集时值等于 6,因此它是一个暂停函数,将一直保持直到完成。

有没有办法让两个 collect 同时从同一个 StateFlow 中获取数据,或者这没有意义吗?

我试过将一个收集放在另一个收集下,但除了 Android Studio 警告我它没有意义外,我看到它只执行一个收集而不执行第二个收集。

kotlin stateflow flow
1个回答
0
投票

总结

将对

continueInSameFlow()
的调用替换为
launch { continueInSameFlow() }
,这样您就可以在另一个协程中异步运行第二个收集器。收集会暂停协程,直到流程完成,但 StateFlows 永远不会完成,因此您导致第一个收集挂起。

详细解说

调用

flow.collect()
,暂停您的协程以等待流程完成发射它会发射的所有项目。 StateFlow 是无限的(SharedFlow 和许多冷流也是如此。)

因此,如果你这样做:

scope.launch {
    someInfiniteFlow.collect {
        println(it)
    }

    println("Hello") // unreachable code
}

那么

"Hello"
将永远不会被打印,因为
collect
永远不会返回。它会永远挂起,总是等待下一个项目被发出,然后为每个项目再次运行 lambda。

当然,如果您用另一个对方付费电话代替

println()
,那也永远无法接通。是相同的流还是不同的流都没有关系。无论你在下面做什么,都是无法访问的代码。

scope.launch {
    someInfiniteFlow.collect {
        println(it)
    }

    anyFlow.collect { // unreachable code
        println(it)
    }
}

在您的示例代码中,同样的原则适用。当您开始在外部

collect
lambda 中收集无限流时,您正在调用一个将永远挂起的函数。因此,你的 lambda 的那个迭代永远不会返回,所以它永远不会用另一个要处理的项目再次调用。


因此,如果你想并行收集流(无论它们是否是同一个流),你必须在单独的协程中进行:

scope.launch {
    someInfiniteFlow.collect {
        println(it)
    }
}

scope.launch {
    anyFlow.collect {
        println(it)
    }
}

出于这个原因,收集流的协程除了收集该流之外不做任何事情是很常见的。它是如此常见,以至于 Flows 有一个

launchIn
运算符来帮助使代码更易于阅读(减少嵌套缩进)。这相当于上面的:

someInfiniteFlow.onEach {
    println(it)
}.launchIn(scope)

anyFlow.onEach {
    println(it)
}.launchIn(scope)

以下是我将如何重写您问题中的代码,以便第一个集合不会被发出“6”后开始的第二个集合打断:

val stateFlow = MutableStateFlow(0) 

private fun stateFlowWithContinueCollect() {
    runBlocking {

        launch {
            repeat(10) {
                delay(1000)
                stateFlow.value = stateFlow.value + 1
            }
        }

        stateFlow.onEach { value ->
                println("Actual value is $value")
            }.launchIn(this)

        stateFlow
            .dropWhile { it != 6 }
            .onEach {
                println("Im in the second collect, value is $it")
            }.launchIn(this)
    }
}

launchIn
调用正在启动
runBlocking
的 CoroutineScope 的子协程,而不是直接在
runBlocking
协程中同步收集流。

如果你想按照自己的方式去做,你可以。在您的代码中,将对

continueInSameFlow()
的调用包装在
launch { }
中,以便它被异步收集到第一个收集器。

请注意,无论采用哪种方式,因为您正在收集无限流,所以您的外部函数

stateFlowWithContinueCollect()
永远不会返回。

© www.soinside.com 2019 - 2024. All rights reserved.