假设我有一个输入参数列表。
val array = arrayOf(input1, input2, ... inputN) // N is usually less than 10
我必须对这些参数进行大量的计算。所以为了优化它,我试图在他自己的线程中与其他线程并发运行每个计算。我使用RxJava2来实现它。
sealed class Result {
object Success : Result()
object NotFound : Result()
}
fun processArray(arr: Array<Input>): Single<Result> {
val singles = arr.map { input ->
Single.fromCallable {
val time = System.currentTimeMillis()
val r = process(input)
log("$r, took ${System.currentTimeMillis() - time}ms")
return@fromCallable r
}
.subscribeOn(Schedulers.newThread())
}
return Single.zip(singles) { results ->
val r = results.map { it as Result }
.firstOrNull { it is Result.Success }
?: Result.NotFound
log("result is: $r")
return@zip r
}
}
fun process(input: Input): Result
一切都正常,但当我看日志时,我通常会看到以下内容。
NotFound, took 130ms
NotFound, took 300ms
Success, took 220ms
NotFound, took 78ms
NotFound, took 540ms
NotFound, took 256ms
result is Success
proccessing took 547ms
我只需要返回第一个成功的结果,这是不对的。但是这段代码会等待所有的结果完成,即使它已经找到了。Result.Success
(从日志中可以看到,整体花费的时间==547ms,因为我们在等待的项目是用了 NotFound, took 540ms
但此时此刻,我得到了 Result.Success
我知道剩下的将是NotFound)
是否可以运行多个 Single.fromCallable()
并在找到第一个成功的结果后处理其余的?
你可以用合并代替zip,然后过滤得到Success类型的第1个元素,就像这样。
sealed class Result {
object Success : Result()
object NotFound : Result()
}
fun processArray(arr: Array<Input>): Single<Result> {
val singles = arr.map { input ->
Single.fromCallable {
val time = System.currentTimeMillis()
val r = process(input)
log("$r, took ${System.currentTimeMillis() - time}ms")
return@fromCallable r
}
.subscribeOn(Schedulers.newThread())
}
return Single
.merge(singles)
.filter { it is Result.Success }
.firstElement()
.switchIfEmpty(Single.just(Result.NotFound))
}
fun process(input: Input): Result