如何在 kotlin 中将闭包传播到流链

问题描述 投票:0回答:1

我正在使用 kotlin,我想使用流传输可能巨大的结果集。我在网上找到了一些解释:

我实现了它,效果很好。我还需要在将结果发送到外部服务之前对结果进行批处理,因此我对流实现了分块操作。类似这样的东西:

fun <T> Flow<T>.chunked(chunkSize: Int): Flow<List<T>> {
  return callbackFlow {
    val listOfResult = mutableListOf<T>()
    [email protected] {
      listOfResult.add(it)
      if (listOfResult.size == chunkSize) {
        trySendBlocking(listOfResult.toList())
        listOfResult.clear()
      }
    }
    if (listOfResult.isNotEmpty()) {
      trySendBlocking(listOfResult)
    }
    close()
  }
}

为了确保一切正常,我创建了一些集成测试:

  • 第一个流+分块消耗所有行,通过
  • 使用第一个流程(从 jdbc 存储库创建的流程)和 应用 take 运算符只是为了考虑几个 x 项。正确通过了。
  • 使用第一个流+分块运算符+take运算符,它永远挂起

所以最后一次测试表明实现上有问题。 我进行了很多调查,但没有发现任何有用的东西,但是,在转储线程时,我发现一个协程线程在第一个流(在 jdbc 存储库中创建的流)的 trySendBlocking 调用中被阻塞。 我想知道分块运算符应该以哪种方式将关闭传播到上游流,因为这部分似乎丢失了。 在这两种情况下,我都使用 close() 调用向下游传播数据末尾,但我查看了 take 运算符,发现它正在使用 emitAbort(...) 触发关闭 我应该在callbackFlow{...}中做类似的事情吗? 经过一番调查后,我能够避免锁定,在存储库内的 trySendBlocking 上添加超时,但我不喜欢这样。最后,我意识到我可以将原始流(在分块运算符中)投射到 SendChannel 并在下游流关闭时关闭它:

 trySendBlocking(listOfResult.toList()).onSuccess {
    LOGGER.debug("Sent")
  }.onFailure {
    LOGGER.warn("An error occurred sending data.", it)
  }.onClosed {
    LOGGER.info("Channel has been closed")
    (originalFlow as SendChannel<*>).close(it)
  }

这是关闭倒流的正确方法吗?有解决这个问题的提示吗? 谢谢!

kotlin asynchronous kotlin-coroutines
1个回答
3
投票

您不应该使用

trySendBlocking
代替
send
。如果没有使用可以处理阻塞代码的调度程序(例如 Dispatchers.Default)将其包装在
withContext
中,则永远不要在协程中使用阻塞函数。但是,当有替代挂起功能时,请使用它,在本例中
send()

此外,

callbackFlow
比转换流所需的更加复杂。您应该使用标准
flow
构建器(因此您将使用
emit()
而不是
send()
)。

fun <T> Flow<T>.chunked(chunkSize: Int): Flow<List<T>> = flow {
    var listOfResult = mutableListOf<T>()
    [email protected] {
        listOfResult.add(it)
        if (listOfResult.size == chunkSize) {
            emit(listOfResult.toList())
            listOfResult = mutableListOf()
        }
    }
    if (listOfResult.isNotEmpty()) {
        emit(listOfResult)
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.