Kotlin coroutines进度计数器

问题描述 投票:3回答:3

我正在使用async / await创建数千个HTTP请求,并希望有一个进度指示器。我以一种天真的方式添加了一个,但注意到当所有请求完成时,计数器值永远不会达到总计。所以我创建了一个简单的测试,当然,它不能按预期工作:

fun main(args: Array<String>) {
    var i = 0
    val range = (1..100000)
    range.map {
        launch {
            ++i
        }
    }
    println("$i ${range.count()}")
}

输出是这样的,第一个数字总是在变化:

98800 100000

我可能在JVM / Kotlin中遗漏了一些关于并发/同步的重要细节,但不知道从哪里开始。有小费吗?

更新:我最终使用了Marko建议的频道:

/**
 * Asynchronously fetches stats for all symbols and sends a total number of requests
 * to the `counter` channel each time a request completes. For example:
 *
 *     val counterActor = actor<Int>(UI) {
 *         var counter = 0
 *         for (total in channel) {
 *             progressLabel.text = "${++counter} / $total"
 *         }
 *     }
 */
suspend fun getAssetStatsWithProgress(counter: SendChannel<Int>): Map<String, AssetStats> {
    val symbolMap = getSymbols()?.let { it.map { it.symbol to it }.toMap() } ?: emptyMap()
    val total = symbolMap.size
    return symbolMap.map { async { getAssetStats(it.key) } }
        .mapNotNull { it.await().also { counter.send(total) } }
        .map { it.symbol to it }
        .toMap()
}
kotlin counter progress coroutine kotlin-coroutines
3个回答
2
投票

解释究竟是什么让你的错误方法失败是次要的:主要的是修复方法。

对于这种通信模式,您应该使用async-await而不是launchactor,所有HTTP作业都将其状态发送到该import kotlinx.coroutines.experimental.* import kotlinx.coroutines.experimental.channels.* import kotlin.system.* import kotlin.coroutines.experimental.* object IncCounter fun counterActor() = actor<IncCounter>(UI) { var counter = 0 for (msg in channel) { updateView(++counter) } } fun main(args: Array<String>) = runBlocking { val counter = counterActor() massiveRun(CommonPool) { counter.send(IncCounter) } counter.close() println("View state: $viewState") } // Everything below is mock code that supports the example // code above: val UI = newSingleThreadContext("UI") fun updateView(newVal: Int) { viewState = newVal } var viewState = 0 suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) { val numCoroutines = 1000 val repeatActionCount = 1000 val time = measureTimeMillis { val jobs = List(numCoroutines) { launch(context) { repeat(repeatActionCount) { action() } } } jobs.forEach { it.join() } } println("Completed ${numCoroutines * repeatActionCount} actions in $time ms") } 。这将自动处理所有并发问题。

以下是一些示例代码,取自您在评论中提供的链接,并根据您的用例进行了调整。而不是某些第三方要求它获取计数器值并使用它更新GUI,actor在UI上下文中运行并更新GUI本身:

Completed 1000000 actions in 2189 ms
View state: 1000000

运行它打印

i++

2
投票

你丢失写入因为i不是原子操作 - 必须读取,递增,然后写回值 - 并且你有多个线程同时读取和写入launch。 (如果不向AtomicInteger提供上下文,则默认使用线程池。)

每当两个线程读取相同的值时,您将从计数中丢失1,然后它们都会写入该值加1。

以某种方式同步,例如通过使用fun main(args: Array<String>) { val i = AtomicInteger(0) val range = (1..100000) range.map { launch { i.incrementAndGet() } } println("$i ${range.count()}") // 100000 100000 } 解决了这个问题:

launch

在打印结果和程序结束时,也无法保证这些后台线程将完成它们的工作 - 您可以通过在runBlocking中添加一个非常小的延迟(几毫秒)来轻松测试它。有了这个,最好将这一切包装在一个fun main(args: Array<String>) = runBlocking { val i = AtomicInteger(0) val range = (1..100000) val jobs: List<Job> = range.map { launch { i.incrementAndGet() } } jobs.forEach { it.join() } println("$i ${range.count()}") // 100000 100000 } 调用中,这将使主线程保持活动状态,然后等待协同程序全部完成:

Coroutines basics

1
投票

你读过val c = AtomicInteger() for (i in 1..1_000_000) launch { c.addAndGet(i) } println(c.get()) 吗?与你的问题完全相同:

launch

这个例子在不到一秒的时间内就完成了,但是它打印了一些任意数字,因为一些协程在main()打印结果之前没有完成。

因为println没有封锁,所以不能保证所有的协同程序都会在async之前完成。你需要使用Deferred,存储await对象和qazxswpoi完成它们。

© www.soinside.com 2019 - 2024. All rights reserved.