如何在kotlin中实现序列的并行映射

问题描述 投票:2回答:1

我正在尝试在Kotlin中实现Iterable和Sequence的并行实现。我有一个小文件,它包含4个扩展函数,但第三个给了我一个编译器错误:

suspend fun <T, R> Iterable<T>.parallelMap(block: suspend(T) -> R) =
    coroutineScope { map { async { block(it) } }.map { it.await() } }

suspend fun <T> Iterable<T>.parallelForEach(block: suspend (T) -> Unit) =
    coroutineScope { map { async { block(it) } }.forEach { it.await() } }

suspend fun <T, R> Sequence<T>.parallelMap(block: suspend(T) -> R) =
    coroutineScope { map { async { block(it) } }.map { it.await() } }

suspend fun <T> Sequence<T>.parallelForEach(block: suspend (T) -> Unit) =
    coroutineScope { map { async { block(it) } }.forEach { it.await() } }

编译器返回并说暂停函数只能在暂停函数内部调用。有没有办法实现这个?

编辑:修复了错误的复制/粘贴

Edit2:我想到了一个实现:

suspend fun <T, R> Sequence<T>.parrallelMap(block: suspend (T) -> R) =
        asIterable().map { coroutineScope { async { block(it) } } }
              .asSequence().map { runBlocking { it.await() } }

我希望这会解雇所有暂停功能并等待他们懒散。我只是不确定这是否安全,或者这样可以节省时间。

kotlin parallel-processing sequence
1个回答
2
投票

延迟序列的并行执行的核心语义存在问题。在迭代生成的序列之前,您当前的实现不会启动block(it)

suspend fun <T, R> Sequence<T>.parallelMap(block: suspend(T) -> R) =
    coroutineScope { map { async { block(it) } }.map { it.await() } }

请考虑以下示例:

sequenceOf(1, 2, 3).parallelMap { it * it }.forEach { println(it) }

对于此示例,执行顺序为

val p1 = async { 1 * 1 } 
val r1 = p1.await()
println(r1)
val p2 = async { 2 * 2 } 
val r2 = p2.await()
println(r2)
val p3 = async { 3 * 3 } 
val r3 = p3.await()
println(r3)

请注意,映射操作的执行是截止的,而不是并行的。

编译器告诉你的是Sequence<T>.map {}的lambda是在调用的上下文之外按需延迟执行的(读取:在你的协程之外),所以你不能使用你当前所使用的协同程序。

坦率地说,我不确定如何同时执行延迟计算和并行执行。

© www.soinside.com 2019 - 2024. All rights reserved.