Kotlin 测试将工作分派到多个渠道

问题描述 投票:0回答:1

我目前已经编写了一些代码,根据设置的工作人员数量将消息发送到 x 个通道。它似乎工作正常,但是当我尝试测试它时,当我不添加延迟时它会卡住,这看起来有点hacky。

我的频道调度员:

fun <INPUT_TYPE, OUTPUT_TYPE> CoroutineScope.dispatchWorkConcurrently(
    inputChannel: ReceiveChannel<INPUT_TYPE>,
    processWork: suspend (INPUT_TYPE, SendChannel<OUTPUT_TYPE>) -> Unit,
    logger: Logger,
    nameOfWorkers: String,
    maxWorkers: Int,
    workerCounter: AtomicInteger
): ReceiveChannel<OUTPUT_TYPE> {
    val coroutineScope = this + SupervisorJob()

    suspend fun launchWorker(
        input: Channel<INPUT_TYPE>,
        output: Channel<OUTPUT_TYPE>
    ) {
        val coroutineExceptionHandler = CoroutineExceptionHandler { _, throwable ->
            workerCounter.decrementAndGet()
            logger.error("Exception in $nameOfWorkers: ${throwable.localizedMessage}")
        }
        val newCounterValue = workerCounter.incrementAndGet()
        coroutineScope.launch(coroutineExceptionHandler + CoroutineName("$nameOfWorkers $newCounterValue")) {
            for (work in input) {
                processWork(work, output)
            }
        }
    }

    val internalChannel = Channel<INPUT_TYPE>()
    val outputChannel = Channel<OUTPUT_TYPE>()

    coroutineScope.launch {
        for (message in inputChannel) {
            if (!internalChannel.trySend(message).isSuccess) {
                val allowedToLaunchNewWorker = workerCounter.get() < maxWorkers
                if (allowedToLaunchNewWorker) {
                    launchWorker(internalChannel, outputChannel)
                }

                internalChannel.send(message)
            }
        }
    }

    return outputChannel
}

单元测试:

@Test
fun `correctly dispatch jobs with multiple workers`() = runBlocking {
    val charPool : List<Char> = ('a'..'z').toList()
    val count = AtomicInteger(0);
    suspend fun processWork(message: Char, output: SendChannel<String>) {
        delay(1)
        println("Does some work with: $message")
        count.getAndIncrement()
    }

    this.dispatchWorkConcurrently(
        inputChannel = inputMessageChannel,
        processWork = ::processWork,
        logger = LoggerFactory.getLogger(ChannelUtilTest::class.java),
        nameOfWorkers = javaClass.simpleName,
        maxWorkers = 10,
        workerCounter = workerCounter
    )

    charPool.forEach {
        inputMessageChannel.send(it)
    }
    delay(20)

    assertEquals(26, count.get())
    assertEquals(10, workerCounter.get())
}

如何在

delay(20)
之后不添加
forEach
的情况下正确测试它?

kotlin kotlin-coroutines
1个回答
1
投票

首先,您当前的测试不起作用的原因是,如果没有

delay()
,您不会等待工作协程(由
dispatchWorkConcurrently
启动)完成。这对于您当前的总体设计来说很难做到。

dispatchWorkConcurrently
的设计实际上并不允许您等待,因为它通过在层次结构之外创建新作业(
SupervisorJob
)(未指定父作业)来破坏结构化并发性。

通常,您应该努力在低级别创建挂起函数,而不是使用

CoroutineScope
接收器并在外部提供的作用域中启动协程。您可以通过使用
supervisorScope { ... }
并在其中本地启动协程来分解并发工作,但您的常规功能仍然等待一切。

此外,正如我在评论中提到的,协程很便宜,因此当您检测到所有工作人员都很忙时,您实际上不需要动态启动它们。您可以从一开始就启动

maxWorkers
协程。

总而言之,它可能看起来像这样:

suspend fun <INPUT_TYPE, OUTPUT_TYPE> dispatchWorkConcurrently(
    inputChannel: ReceiveChannel<INPUT_TYPE>,
    processMessage: suspend (INPUT_TYPE, SendChannel<OUTPUT_TYPE>) -> Unit,
    logger: Logger,
    nameOfWorkers: String,
    maxWorkers: Int,
): ReceiveChannel<OUTPUT_TYPE> = supervisorScope {

    val outputChannel = Channel<OUTPUT_TYPE>()

    val coroutineExceptionHandler = CoroutineExceptionHandler { _, throwable ->
        logger.error("Exception in $nameOfWorkers: ${throwable.localizedMessage}")
    }

    repeat(maxWorkers) { workerNum ->
        launch(coroutineExceptionHandler + CoroutineName("$nameOfWorkers $workerNum")) {
            for (msg in inputChannel) {
                processMessage(msg, outputChannel)
            }
        }
    }

    outputChannel
}

这意味着

dispatchWorkConcurrently
现在等待所有子协程完成,因此调用者如果想要与它同时执行操作,则需要在协程中调用它。您的测试可能会变成:

@Test
fun `correctly dispatch jobs with multiple workers`() = runBlocking {
    val charPool : List<Char> = ('a'..'z').toList()
    val count = AtomicInteger(0)

    suspend fun processMessage(message: Char, output: SendChannel<String>) {
        delay(1)
        println("Does some work with: $message")
        count.getAndIncrement()
    }

    val job = launch(Dispatchers.Default) {
        dispatchWorkConcurrently(
            inputChannel = inputMessageChannel,
            processMessage = ::processMessage,
            logger = LoggerFactory.getLogger(ChannelUtilTest::class.java),
            nameOfWorkers = javaClass.simpleName,
            maxWorkers = 10,
        )
    }

    charPool.forEach {
        inputMessageChannel.send(it)
    }
    inputMessageChannel.close()
    job.join()

    assertEquals(26, count.get())
}
© www.soinside.com 2019 - 2024. All rights reserved.