如何在 Kotlin 序列上进行平行折叠?

问题描述 投票:0回答:1

作为帮助我学习 Kotlin 的练习,我正在编写一个应用程序,其中有大量对象序列,我想根据一组标准找到“最佳”对象,最明显的方法是使用一个折叠。

我在其他语言中做了同样的事情,特别是 Java 和 Haskell,在这两种语言中,我都可以通过并行折叠来实现性能上的巨大改进 - 即,将序列分割成块,单独折叠每个块并在其中折叠。并行,然后合并结果。

在Java中(假设

objs
是一个包含我想要折叠的对象的
Stream
),这很简单:

objs.parallel().reduce(this::findBetter);

然后为我处理分块和平行折叠。

我不知道如何在 Kotlin 中做到这一点。我知道使用

chunked
方法将序列拆分为块序列很容易,然后我可以将该序列转换为块流,然后我认为下一阶段是将每个块映射到流向异步折叠 chunk 的结果;然后我可以折叠中间结果流以获得最终结果。但我尝试过的一切似乎在某些时候都无法编译。

这是一个好方法吗?如果是的话,如何编码 - 特别是关于异步折叠块的部分?

kotlin kotlin-coroutines fold
1个回答
0
投票

这个问题实际上比看起来更棘手。这听起来像是对一项功能的简单请求,该功能应该就在那里并且可以正常工作。然而,序列和流程在设计上都是连续且有序的。并发处理不是他们的目标,他们不支持这样的功能。我相信有计划为流量提供它,但我们还没有实现。

此外,我们需要意识到添加此类功能并不那么简单,因为没有单一的方法可以做到这一点。如果序列/流是有序的,我们是否应该完全忽略并行处理的排序,或者我们是否需要提供一些排序保证?我们应该如何在工人之间分配工作?如何调度并发任务:线程、执行器、协程?预期的并发水平是多少?等等

假设我们想使用协程,我们完全忽略排序,并且按照您所说的那样对数据进行分块。最简单的解决方案是基于集合 - 正如您所说,我们需要对数据进行分块,异步处理每个块,然后加入:

suspend fun <T> Iterable<T>.parallelReduce(chunkSize: Int, operation: (T, T) -> T): T = coroutineScope {
    chunked(chunkSize)
        .map { async { it.reduce(operation) } }
        .awaitAll()
        .reduce(operation)
}

我们可以通过以下方式将其与序列一起使用:

seq.asIterable().parallelReduce(...)
。当然,该解决方案在初始步骤中消耗了整个序列。如果序列非常大并且我们无法将所有项目保留在内存中,我们需要另一种解决方案。

我们可以将序列转换为流。 Flow 与序列非常相似,但它允许在其运算符中使用协程,并且它还提供了一种通过在中间放置 buffer() 将管道拆分为两个并发管道的方法:

suspend fun <T> Sequence<T>.parallelReduce(chunkSize: Int, concurrency: Int, operation: (T, T) -> T): T = coroutineScope {
    require(concurrency >= 2)
    chunked(chunkSize).asFlow()
        .map { async { it.reduce(operation) } }
        .buffer(concurrency - 2) // buffer of 0 already provides concurrency of 2
        .map { it.await() }
        .reduce(operation)
}

本质上,它的工作方式与第一个示例类似,但它不是立即调度所有块,而是一次仅启动有限数量的任务。请注意,此解决方案仍然按顺序而不是同时使用结果。这意味着如果第一个块比其他块花费的时间长得多,那么在后续块完成后,我们将不会消耗它们的结果,也不会启动另一个块,直到第一个块解锁整个管道。

而且,我们这样使用流程感觉有点奇怪。它们更多地是为组件间通信而设计的,而不是作为并发工具。我们在

flow.map()
中启动协程感觉很奇怪。或者,我们可以通过使用通道并启动工作协程来完全手动完成:

suspend fun <T> Sequence<T>.parallelReduce(chunkSize: Int, concurrency: Int, operation: (T, T) -> T): T = coroutineScope {
    val inputsChannel = produce { chunked(chunkSize).forEach { send(it) } }
    val resultsChannel = Channel<T>()
    val result = async {
        var acc = resultsChannel.receive()
        for (item in resultsChannel) {
            acc = operation(acc, item)
        }
        acc
    }
    coroutineScope {
        repeat(concurrency) {
            launch {
                for (inputs in inputsChannel) {
                    resultsChannel.send(inputs.reduce(operation))
                }
            }
        }
    }
    resultsChannel.close()
    result.await()
}

该解决方案同时处理和使用结果,因此它没有上述缺点。

作为奖励:我所说的关于不支持并发处理的流并不完全正确。有一个运算符实际上提供了并发处理:flatMapMerge()。感觉它不太适合整个API,就像是偶然添加的一样。无论如何,它允许非常轻松地使用并发处理:

suspend fun <T> Sequence<T>.parallelReduce(chunkSize: Int, concurrency: Int, operation: (T, T) -> T): T = coroutineScope {
    chunked(chunkSize).asFlow()
        .flatMapMerge(concurrency) {
            flow { emit(it.reduce(operation)) }
        }
        .reduce(operation)
}

我认为这是某种诡计。目前还不清楚这段代码的作用,特别是:它是如何做到的。但它确实有效,它需要最少的代码,而且我相信它可以最佳地安排处理。

© www.soinside.com 2019 - 2024. All rights reserved.