我正在使用 kotlin,我想使用流传输可能巨大的结果集。我在网上找到了一些解释:
我实现了它,效果很好。我还需要在将结果发送到外部服务之前对结果进行批处理,因此我对流实现了分块操作。类似这样的东西:
fun <T> Flow<T>.chunked(chunkSize: Int): Flow<List<T>> {
return callbackFlow {
val listOfResult = mutableListOf<T>()
[email protected] {
listOfResult.add(it)
if (listOfResult.size == chunkSize) {
trySendBlocking(listOfResult.toList())
listOfResult.clear()
}
}
if (listOfResult.isNotEmpty()) {
trySendBlocking(listOfResult)
}
close()
}
}
为了确保一切正常,我创建了一些集成测试:
所以最后一次测试表明实现上有问题。 我进行了很多调查,但没有发现任何有用的东西,但是,在转储线程时,我发现一个协程线程在第一个流(在 jdbc 存储库中创建的流)的 trySendBlocking 调用中被阻塞。 我想知道分块运算符应该以哪种方式将关闭传播到上游流,因为这部分似乎丢失了。 在这两种情况下,我都使用 close() 调用向下游传播数据末尾,但我查看了 take 运算符,发现它正在使用 emitAbort(...) 触发关闭 我应该在callbackFlow{...}中做类似的事情吗? 经过一番调查后,我能够避免锁定,在存储库内的 trySendBlocking 上添加超时,但我不喜欢这样。最后,我意识到我可以将原始流(在分块运算符中)投射到 SendChannel 并在下游流关闭时关闭它:
trySendBlocking(listOfResult.toList()).onSuccess {
LOGGER.debug("Sent")
}.onFailure {
LOGGER.warn("An error occurred sending data.", it)
}.onClosed {
LOGGER.info("Channel has been closed")
(originalFlow as SendChannel<*>).close(it)
}
这是关闭倒流的正确方法吗?有解决这个问题的提示吗? 谢谢!
您不应该使用
trySendBlocking
代替 send
。如果没有使用可以处理阻塞代码的调度程序(例如 Dispatchers.Default)将其包装在 withContext
中,则永远不要在协程中使用阻塞函数。但是,当有替代挂起功能时,请使用它,在本例中 send()
。
此外,
callbackFlow
比转换流所需的更加复杂。您应该使用标准 flow
构建器(因此您将使用 emit()
而不是 send()
)。
fun <T> Flow<T>.chunked(chunkSize: Int): Flow<List<T>> = flow {
var listOfResult = mutableListOf<T>()
[email protected] {
listOfResult.add(it)
if (listOfResult.size == chunkSize) {
emit(listOfResult.toList())
listOfResult = mutableListOf()
}
}
if (listOfResult.isNotEmpty()) {
emit(listOfResult)
}
}