Kotlin - 如何运行n个协程并等待前m个结果或超时?

问题描述 投票:4回答:1

我正在尝试编写一个函数来启动n个协程并等到第一个m完成。如果协同程序在某些超时内未能完成,则所有协同程序/作业都将被取消。我最初的实现如下所示,但我觉得它可以改进。我最初的想法是使用父作业来运行所有其他工作,以便可以取消父作业并级联到剩余的子作业。但是,这会导致必须捕获TimeoutCancellationException。

如何编写一个函数来启动n个协程并等到第一个m完成,或者在协同程序完成之前发生超时?

private suspend fun queryAllHosts(
        queryFactories: List<(query: String, pageIndex: Int) -> String>
        , query: String
        , pageIndex: Int
        , maxSuccessfulHosts: Int
        , queryTimeout: Long
        , requestTimeout: Long
): ArrayList<QueryResult<ResultModel>> {
    val results = ArrayList<QueryResult<ResultModel>>()
    val rootJob = Job()

    try {
        withTimeout(queryTimeout, TimeUnit.MILLISECONDS) {
            queryFactories.map {
                async(parent = rootJob) {
                    val pagedResult = queryHost(
                            it
                            , query
                            , pageIndex
                            , requestTimeout
                    )

                    if (pagedResult.isSuccessful()) {
                        results.add(pagedResult)
                    }

                    if (results.size == maxSuccessfulHosts) {
                        rootJob.cancelAndJoin()
                        return@async
                    }
                }
            }.awaitAll()
        }
    } catch (ex: TimeoutCancellationException) {
        Log.w(Tag, "Query timed out, successful queries: ${results.size}")
    } catch (ignored: JobCancellationException) {
        // Ignored
    } catch (ex: Exception) {
        Log.w(Tag, "Unexpected exception", ex)
    }

    return results
}

UPDATE

由于并发修改异常,我对之前接受的答案没有任何好运。下面是对原始答案的轻微调整,以避免并发修改异常,但是它无法遵守自动收报机超时。

如何避免并发修改异常并仍然尊重自动收报机超时?

suspend fun <T> List<Deferred<T>>.awaitCount(count: Int, timeoutMs: Long): List<T> {
    require(count <= size)

    val Tag = "DERP"

    val toAwait = HashSet<Deferred<T>>(this)
    val result = ArrayList<T>()
    val ticker = ticker(timeoutMs)

    forEach { deferred ->
        deferred.invokeOnCompletion {
            if (!deferred.isCompletedExceptionally) {
                Log.d(Tag, "(Completed) Value: ${deferred.getCompleted()}")
            } else {
                Log.d(Tag, "(Completed) Exception: $it")
            }
        }
    }

    val processed = HashSet<Deferred<T>>()

    val elapsedTime = measureTimeMillis {
        whileSelect {
            toAwait.minus(processed).forEach { deferred ->
                processed.add(deferred)
                deferred.onAwait {
                    toAwait.remove(deferred)
                    result.add(it)
                    result.size != count
                }
            }

            ticker.onReceive {
                toAwait.forEach { it.cancel() }
                false
            }
        }
    }

    Log.d(Tag, "Elapsed time: $elapsedTime")

    return result
}

现在使用以下代码创建延迟实例:

private fun makeRequest(
        url: String
        , timeoutMs: Int
): Document? = try {
    Jsoup.connect(url).timeout(timeoutMs).get()
} catch (ex: Exception) {
    null
}

private fun createAsyncRequests(
        queryFactories: List<(query: String, pageIndex: Int) -> String>
        , query: String
        , pageIndex: Int
        , timeoutMs: Int
): List<Deferred<QueryResult<TorrentResult>>> = queryFactories.map { queryFactory ->
    async(start = CoroutineStart.LAZY) {
        try {
            val url = queryFactory(query, pageIndex)
            makeRequest(
                    url
                    , timeoutMs
            ).getQueryResult(pageIndex, url)
        } catch (ex: Exception) {
            QueryResult<TorrentResult>(state = QueryResult.State.ERROR)
        }
    }
}

UPDATE

下面的日志显示提供的2,000毫秒的timoutMs未被遵守,因为超时发生在11,861毫秒:

2018-09-13 22:39:00.307 20475-20807/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.315 20475-20807/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.454 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|1/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://indiapirate.com/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.455 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.456 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.470 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|2/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://superbay.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.471 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.472 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.479 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|3/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://superbay.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.480 20475-20806/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.480 20475-20806/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.481 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|4/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratebays.fi/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.481 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.482 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.500 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|5/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratebays.be/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.501 20475-20798/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.501 20475-20798/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.577 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|6/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratebay.nz/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.577 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.578 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.602 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|7/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratebay6.org/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.602 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.603 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.604 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|8/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepirateproxy.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.604 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.605 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.621 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|9/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://uktpb.net/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.621 20475-20806/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.622 20475-20806/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.694 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|10/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://proxyproxyproxy.net/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.695 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.695 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.712 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|11/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://fastpirate.link/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.713 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.713 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.868 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|12/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://freepirate.eu/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.869 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.869 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.480 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|13/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://pirate.tel/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:01.481 20475-20968/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.482 20475-20968/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.649 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|14/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratesbay.fi/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:01.649 20475-20798/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.650 20475-20798/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.780 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|15/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratepirate.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:01.781 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.782 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.131 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|16/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpbproxy.fi/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:03.132 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.132 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.250 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|17/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:03.251 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.253 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.296 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|18/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://freeproxy.click/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:03.296 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.297 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.441 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|19/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:03.442 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.443 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:04.826 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|20/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratepirate.net/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:04.868 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|21/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://proxyproxy.fi/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:05.325 20475-20807/com.masterwok.tpbsearchandroid D/DERP: (Completed|22/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:05.926 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|23/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:06.002 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|24/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:06.117 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|25/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpbproxy.click/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:06.338 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|26/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:06.923 20475-20807/com.masterwok.tpbsearchandroid D/DERP: (Completed|27/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepiratebay.red/search/hobbit+1977/0/7
2018-09-13 22:39:07.214 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|28/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepirateproxy.click/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:07.625 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|29/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepirateway.click/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:08.398 20475-20807/com.masterwok.tpbsearchandroid D/DERP: (Completed|30/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:08.431 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|31/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://proxybay.blue/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:08.684 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|32/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratepiratepirate.org/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:09.033 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|33/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://unblocktpb.org/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:09.759 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|34/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:09.879 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|35/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpbunblock.net/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:09.961 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|36/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpbproxy.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:10.554 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|37/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:10.715 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|38/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://Piratebayproxy.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:10.855 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|39/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:11.014 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|40/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpb.fun/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:11.298 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|41/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepiratebayproxy.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:11.313 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|42/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpb.review/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:11.932 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|43/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://proxybay.life/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:12.166 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|44/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepiratebayproxy.one/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:12.168 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Elapsed time: 11861
2018-09-13 22:39:12.885 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|45/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@b129740
2018-09-13 22:39:13.437 20475-20807/com.masterwok.tpbsearchandroid D/DERP: (Completed|46/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@c801679
2018-09-13 22:39:14.051 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|47/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@3f89fbe
2018-09-13 22:39:14.154 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|48/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@c7c181f
2018-09-13 22:39:14.658 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|49/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@729e76c
2018-09-13 22:39:17.334 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|50/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@ae2a135

使用日志记录更新了代码(我删除了TimeoutException并返回false):

suspend fun <T> List<Deferred<T>>.awaitCount(
        count: Int
        , timeoutMs: Long
): List<T> {
    require(count <= size)

    val Tag = "DERP"

    val toAwait = CopyOnWriteArraySet<Deferred<T>>(this)
    val result = ArrayList<T>()
    val ticket = ticker(timeoutMs)
    var completedCount = 0

    forEach { deferred ->
        deferred.invokeOnCompletion {
            completedCount++

            if (deferred.isCompletedExceptionally) {
                Log.d(Tag, "(Completed|$completedCount/$size) Exception: $it")
            } else {
                Log.d(Tag, "(Completed|$completedCount/$size) Value: ${deferred.getCompleted()}")
            }
        }
    }

    val elapsedTime = measureTimeMillis {
        whileSelect {
            ticket.onReceive {
                toAwait.forEach { it.cancel() }
                false
            }

            toAwait.forEach { deferred ->
                Log.d(Tag, "Starting deferred..")
                deferred.onAwait {
                    toAwait.remove(deferred)
                    result.add(it)
                    result.size != count
                }
            }
        }
    }

    Log.d(Tag, "Elapsed time: $elapsedTime")

    return result
}
kotlin kotlinx.coroutines
1个回答
5
投票

可以通过避免使用额外的已启动任务和根作业来改进它。

kotlinx.coroutines为这些复杂的运算符提供了select子句,完全适合您的用例。而且,很容易概括:

suspend fun <T> List<Deferred<T>>.awaitCount(count: Int, timeoutMs: Long): List<T> {
    require(count <= size)

    val toAwait = CopyOnWriteArraySet<Deferred<T>>(this)
    val result = ArrayList<T>()
    val ticket = ticker(timeoutMs)

    whileSelect {
        toAwait.forEach { deferred ->
            deferred.onAwait {
                toAwait.remove(deferred)
                result.add(it)
                result.size != count
            }
        }

        ticket.onReceive {
            val e = TimeoutException()
            toAwait.forEach { it.cancel(e) }
            throw e
        }
    }

    return result
}

然后你可以在queryAllHosts中使用它:

val queries = queryFactories.map { queryHost(...) }
return queries.awaitCount(maxSuccessfulHosts, queryTimeout)

您可以按照自己的方式调整awaitCount,例如专门为Retrofit和添加isSuccessful检查

© www.soinside.com 2019 - 2024. All rights reserved.