Kotlin协程-如何阻止等待/加入所有作业?

问题描述 投票:4回答:3

我是Kotlin / Coroutines的新手,所以希望我只是缺少一些东西/不完全了解如何为我要解决的问题构建代码。

本质上,我正在获取一个字符串列表,对于列表中的每个项目,我都希望将其发送到另一种方法来工作(进行网络调用并根据响应返回数据)。 (Edit :)我希望所有调用同时启动,并阻塞直到所有调用完成/响应被执行,然后返回包含每个响应信息的新列表。

我可能还不完全了解何时使用启动/异步,但是我尝试遵循启动(使用joinAll)和异步(使用await)。

fun processData(lstInputs: List<String>): List<response> {

    val lstOfReturnData = mutableListOf<response>()

    runBlocking {
        withContext(Dispatchers.IO) {
            val jobs = List(lstInputs.size) {
                launch {
                    lstOfReturnData.add(networkCallToGetData(lstInputs[it]))
                }
            }
            jobs.joinAll()
        }
    }

    return lstofReturnData

我期望发生的是,如果我的lstInputs的大小为120,在加入所有作业时,我的lstOfReturnData的大小也应该为120。

实际发生的是不一致的结果。我将运行一次,并且在最终列表中获得118,再次运行,它是120,再次运行,它是117,依此类推。在networkCallToGetData()方法中,我正在处理任何异常,至少返回一些东西对于每个请求,无论网络呼叫是否失败。

任何人都可以帮助解释为什么我得到的结果不一致,以及我需要做些什么来确保我进行适当的阻止并继续所有工作,然后再继续?

kotlin kotlinx.coroutines
3个回答
9
投票

mutableListOf()创建一个不是线程安全的ArrayList。尝试改用ConcurrentLinkedQueue

而且,您是否使用Kotlin / Kotlinx.coroutine的稳定版本(不是旧的实验版本)?在稳定版本中,由于引入了结构化并发,因此无需编写jobs.joinAll anymorelaunchrunBlocking的扩展功能,它将在runBlocking范围内启动新协程,并且runBlocking范围将自动等待所有已启动的作业完成。因此,以上代码可以简化为

val lstOfReturnData = ConcurrentLinkedQueue<response>()
runBlocking {
        lstInputs.forEach {
            launch(Dispatches.IO) {
                lstOfReturnData.add(networkCallToGetData(it))
            }
        }
}
return lstOfReturnData

1
投票

Runblocking应该意味着您不必致电加入。从runblocking范围内启动协程应该为您执行此操作。您是否尝试过:

fun processData(lstInputs: List<String>): List<response> {

val lstOfReturnData = mutableListOf<response>()

runBlocking {
    lstInputs.forEach {
            launch(Dispatchers.IO) {
                lstOfReturnData.add(networkCallToGetData(it))
            }
   } 
}

return lstofReturnData

0
投票

runBlocking会中断当前线程,直到其完成。我想这不是你想要的。如果我认为有误,并且您想阻止当前线程,则可以摆脱协程,而只需在当前线程中进行网络调用即可:

val lstOfReturnData = mutableListOf<response>()
lstInputs.forEach {
    lstOfReturnData.add(networkCallToGetData(it))
} 

但是,如果不是您的意图,则可以执行以下操作:

class Presenter(private val uiContext: CoroutineContext = Dispatchers.Main) 
    : CoroutineScope {

    // creating local scope for coroutines
    private var job: Job = Job()
    override val coroutineContext: CoroutineContext
        get() = uiContext + job

    // call this to cancel job when you don't need it anymore
    fun detach() {
        job.cancel()
    }

    fun processData(lstInputs: List<String>) {

        launch {
            val deferredList = lstInputs.map { 
                async(Dispatchers.IO) { networkCallToGetData(it) } // runs in parallel in background thread
            }
            val lstOfReturnData = deferredList.awaitAll() // waiting while all requests are finished without blocking the current thread

            // use lstOfReturnData in Main Thread, e.g. update UI
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.