我想从通道中读取所有可用元素,以便我可以对它们进行批处理,如果我的接收器比我的发送器慢(希望处理批处理将更高性能并允许接收器赶上)。我只想暂停,如果频道是空的,不要暂停,直到我的批处理已满或超时不像this question。
标准kotlin库中是否有任何内容可以实现此目的?
我没有在标准的kotlin库中找到任何东西,但这是我想出来的。这将仅暂停第一个元素,然后poll
暂停所有剩余元素。这只适用于Buffered Channel,以便准备好处理的元素排队并可用于poll
/**
* Receive all available elements up to [max]. Suspends for the first element if the channel is empty
*/
internal suspend fun <E> ReceiveChannel<E>.receiveAvailable(max: Int): List<E> {
if (max <= 0) {
return emptyList()
}
val batch = mutableListOf<E>()
if (this.isEmpty) {
// suspend until the next message is ready
batch.add(receive())
}
fun pollUntilMax() = if (batch.size >= max) null else poll()
// consume all other messages that are ready
var next = pollUntilMax()
while (next != null) {
batch.add(next)
next = pollUntilMax()
}
return batch
}
我测试了Jakes代码,它对我很有用(谢谢!)。没有最大限制,我把它归结为:
suspend fun <E> ReceiveChannel<E>.receiveAvailable(): List<E> {
val allMessages = mutableListOf<E>()
allMessages.add(receive())
var next = poll()
while (next != null) {
allMessages.add(next)
next = poll()
}
return allMessages
}