在 Kotlin 协程中,如何验证 if 和 when 发生任何流背压,也就是说,如果我有一个正在使用
buffer()
的流,我如何在每次生产者和/或消费者因背压而暂停自己?
我想这样做是为了微调流缓冲区大小以及消费者端的并行性。为了举例说明,我将提供一个简单的示例:
suspend fun main() {
val bufferedFlow = createFlow().buffer()
// Here, the collector is much slower than the producer
bufferedFlow.collect {
delay(100)
println("Collecting: $it")
}
}
// Produces numbers at a fast pace
fun createFlow(): Flow<Int> = (0..Integer.MAX_VALUE).asFlow().onEach {
delay(10)
println("Emitting: $it")
}
在这个例子中,消费者被生产者压垮了,但由于生产者支持背压,所以它会被挂起,直到消费者从缓冲区消费完一个项目。当然,在这个例子中,很明显哪一侧是快,哪一侧是慢,但在某些情况下事情并不那么清楚。
如果想要检测并记录每次生产者或消费者由于另一方的背压而暂停时,他该怎么做?
首先,我认为“背压”没有通用含义,它确实取决于具体情况。例如,如果我们不使用缓冲区并通过简单的打印来收集,那么从技术上讲,生产者仍然等待消费者打印该项目。但通常我们不认为这是真正的背压。
我们可以假设背压是指收集器挂起的情况。我认为创建这样的通用运算符在技术上应该是可行的。它需要检测下游收集器是否挂起,然后通知这一点。然而,这可能有点 hacky,因为使用协程 API 检测挂起的被调用者并不容易(但我认为仍然有可能)。
相反,我建议采用另一种方法。通常,当我们谈论背压时,我们指的是使用缓冲区同时运行生产者和消费者的情况。由于缓冲区操作员协调双方之间的通信,因此这是我们可以轻松检测背压的地方:
fun <T> Flow<T>.bufferWithOverflowDetection(capacity: Int = Channel.BUFFERED, onOverflow: (T) -> Unit): Flow<T> = flow {
coroutineScope {
val channel = produce(capacity = capacity) {
collect {
// collecting from upstream
if (trySend(it).isFailure) {
onOverflow(it)
send(it)
}
}
}
channel.consumeEach {
// emitting do downstream
emit(it)
}
}
}
然后只需替换示例中的一行:
val bufferedFlow = createFlow().bufferWithOverflowDetection { println("Overflow with: $it") }
此外,如果您有类似的需求,那么您可以考虑完全迁移到渠道,因为它们通常是协调生产者和消费者的更强大的实用程序。不过,正如我们在上面看到的,有时我们可以一起使用这两个 API。