我正在使用async / await创建数千个HTTP请求,并希望有一个进度指示器。我以一种天真的方式添加了一个,但注意到当所有请求完成时,计数器值永远不会达到总计。所以我创建了一个简单的测试,当然,它不能按预期工作:
fun main(args: Array<String>) {
var i = 0
val range = (1..100000)
range.map {
launch {
++i
}
}
println("$i ${range.count()}")
}
输出是这样的,第一个数字总是在变化:
98800 100000
我可能在JVM / Kotlin中遗漏了一些关于并发/同步的重要细节,但不知道从哪里开始。有小费吗?
更新:我最终使用了Marko建议的频道:
/**
* Asynchronously fetches stats for all symbols and sends a total number of requests
* to the `counter` channel each time a request completes. For example:
*
* val counterActor = actor<Int>(UI) {
* var counter = 0
* for (total in channel) {
* progressLabel.text = "${++counter} / $total"
* }
* }
*/
suspend fun getAssetStatsWithProgress(counter: SendChannel<Int>): Map<String, AssetStats> {
val symbolMap = getSymbols()?.let { it.map { it.symbol to it }.toMap() } ?: emptyMap()
val total = symbolMap.size
return symbolMap.map { async { getAssetStats(it.key) } }
.mapNotNull { it.await().also { counter.send(total) } }
.map { it.symbol to it }
.toMap()
}
解释究竟是什么让你的错误方法失败是次要的:主要的是修复方法。
对于这种通信模式,您应该使用async-await
而不是launch
或actor
,所有HTTP作业都将其状态发送到该import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*
import kotlin.system.*
import kotlin.coroutines.experimental.*
object IncCounter
fun counterActor() = actor<IncCounter>(UI) {
var counter = 0
for (msg in channel) {
updateView(++counter)
}
}
fun main(args: Array<String>) = runBlocking {
val counter = counterActor()
massiveRun(CommonPool) {
counter.send(IncCounter)
}
counter.close()
println("View state: $viewState")
}
// Everything below is mock code that supports the example
// code above:
val UI = newSingleThreadContext("UI")
fun updateView(newVal: Int) {
viewState = newVal
}
var viewState = 0
suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
val numCoroutines = 1000
val repeatActionCount = 1000
val time = measureTimeMillis {
val jobs = List(numCoroutines) {
launch(context) {
repeat(repeatActionCount) { action() }
}
}
jobs.forEach { it.join() }
}
println("Completed ${numCoroutines * repeatActionCount} actions in $time ms")
}
。这将自动处理所有并发问题。
以下是一些示例代码,取自您在评论中提供的链接,并根据您的用例进行了调整。而不是某些第三方要求它获取计数器值并使用它更新GUI,actor在UI上下文中运行并更新GUI本身:
Completed 1000000 actions in 2189 ms
View state: 1000000
运行它打印
i++
你丢失写入因为i
不是原子操作 - 必须读取,递增,然后写回值 - 并且你有多个线程同时读取和写入launch
。 (如果不向AtomicInteger
提供上下文,则默认使用线程池。)
每当两个线程读取相同的值时,您将从计数中丢失1,然后它们都会写入该值加1。
以某种方式同步,例如通过使用fun main(args: Array<String>) {
val i = AtomicInteger(0)
val range = (1..100000)
range.map {
launch {
i.incrementAndGet()
}
}
println("$i ${range.count()}") // 100000 100000
}
解决了这个问题:
launch
在打印结果和程序结束时,也无法保证这些后台线程将完成它们的工作 - 您可以通过在runBlocking
中添加一个非常小的延迟(几毫秒)来轻松测试它。有了这个,最好将这一切包装在一个fun main(args: Array<String>) = runBlocking {
val i = AtomicInteger(0)
val range = (1..100000)
val jobs: List<Job> = range.map {
launch {
i.incrementAndGet()
}
}
jobs.forEach { it.join() }
println("$i ${range.count()}") // 100000 100000
}
调用中,这将使主线程保持活动状态,然后等待协同程序全部完成:
Coroutines basics
你读过val c = AtomicInteger()
for (i in 1..1_000_000)
launch {
c.addAndGet(i)
}
println(c.get())
吗?与你的问题完全相同:
launch
这个例子在不到一秒的时间内就完成了,但是它打印了一些任意数字,因为一些协程在main()打印结果之前没有完成。
因为println
没有封锁,所以不能保证所有的协同程序都会在async
之前完成。你需要使用Deferred
,存储await
对象和qazxswpoi完成它们。