我目前已经编写了一些代码,根据设置的工作人员数量将消息发送到 x 个通道。它似乎工作正常,但是当我尝试测试它时,当我不添加延迟时它会卡住,这看起来有点hacky。
我的频道调度员:
fun <INPUT_TYPE, OUTPUT_TYPE> CoroutineScope.dispatchWorkConcurrently(
inputChannel: ReceiveChannel<INPUT_TYPE>,
processWork: suspend (INPUT_TYPE, SendChannel<OUTPUT_TYPE>) -> Unit,
logger: Logger,
nameOfWorkers: String,
maxWorkers: Int,
workerCounter: AtomicInteger
): ReceiveChannel<OUTPUT_TYPE> {
val coroutineScope = this + SupervisorJob()
suspend fun launchWorker(
input: Channel<INPUT_TYPE>,
output: Channel<OUTPUT_TYPE>
) {
val coroutineExceptionHandler = CoroutineExceptionHandler { _, throwable ->
workerCounter.decrementAndGet()
logger.error("Exception in $nameOfWorkers: ${throwable.localizedMessage}")
}
val newCounterValue = workerCounter.incrementAndGet()
coroutineScope.launch(coroutineExceptionHandler + CoroutineName("$nameOfWorkers $newCounterValue")) {
for (work in input) {
processWork(work, output)
}
}
}
val internalChannel = Channel<INPUT_TYPE>()
val outputChannel = Channel<OUTPUT_TYPE>()
coroutineScope.launch {
for (message in inputChannel) {
if (!internalChannel.trySend(message).isSuccess) {
val allowedToLaunchNewWorker = workerCounter.get() < maxWorkers
if (allowedToLaunchNewWorker) {
launchWorker(internalChannel, outputChannel)
}
internalChannel.send(message)
}
}
}
return outputChannel
}
单元测试:
@Test
fun `correctly dispatch jobs with multiple workers`() = runBlocking {
val charPool : List<Char> = ('a'..'z').toList()
val count = AtomicInteger(0);
suspend fun processWork(message: Char, output: SendChannel<String>) {
delay(1)
println("Does some work with: $message")
count.getAndIncrement()
}
this.dispatchWorkConcurrently(
inputChannel = inputMessageChannel,
processWork = ::processWork,
logger = LoggerFactory.getLogger(ChannelUtilTest::class.java),
nameOfWorkers = javaClass.simpleName,
maxWorkers = 10,
workerCounter = workerCounter
)
charPool.forEach {
inputMessageChannel.send(it)
}
delay(20)
assertEquals(26, count.get())
assertEquals(10, workerCounter.get())
}
如何在
delay(20)
之后不添加 forEach
的情况下正确测试它?
首先,您当前的测试不起作用的原因是,如果没有
delay()
,您不会等待工作协程(由dispatchWorkConcurrently
启动)完成。这对于您当前的总体设计来说很难做到。
dispatchWorkConcurrently
的设计实际上并不允许您等待,因为它通过在层次结构之外创建新作业(SupervisorJob
)(未指定父作业)来破坏结构化并发性。
通常,您应该努力在低级别创建挂起函数,而不是使用
CoroutineScope
接收器并在外部提供的作用域中启动协程。您可以通过使用 supervisorScope { ... }
并在其中本地启动协程来分解并发工作,但您的常规功能仍然等待一切。
此外,正如我在评论中提到的,协程很便宜,因此当您检测到所有工作人员都很忙时,您实际上不需要动态启动它们。您可以从一开始就启动
maxWorkers
协程。
总而言之,它可能看起来像这样:
suspend fun <INPUT_TYPE, OUTPUT_TYPE> dispatchWorkConcurrently(
inputChannel: ReceiveChannel<INPUT_TYPE>,
processMessage: suspend (INPUT_TYPE, SendChannel<OUTPUT_TYPE>) -> Unit,
logger: Logger,
nameOfWorkers: String,
maxWorkers: Int,
): ReceiveChannel<OUTPUT_TYPE> = supervisorScope {
val outputChannel = Channel<OUTPUT_TYPE>()
val coroutineExceptionHandler = CoroutineExceptionHandler { _, throwable ->
logger.error("Exception in $nameOfWorkers: ${throwable.localizedMessage}")
}
repeat(maxWorkers) { workerNum ->
launch(coroutineExceptionHandler + CoroutineName("$nameOfWorkers $workerNum")) {
for (msg in inputChannel) {
processMessage(msg, outputChannel)
}
}
}
outputChannel
}
这意味着
dispatchWorkConcurrently
现在等待所有子协程完成,因此调用者如果想要与它同时执行操作,则需要在协程中调用它。您的测试可能会变成:
@Test
fun `correctly dispatch jobs with multiple workers`() = runBlocking {
val charPool : List<Char> = ('a'..'z').toList()
val count = AtomicInteger(0)
suspend fun processMessage(message: Char, output: SendChannel<String>) {
delay(1)
println("Does some work with: $message")
count.getAndIncrement()
}
val job = launch(Dispatchers.Default) {
dispatchWorkConcurrently(
inputChannel = inputMessageChannel,
processMessage = ::processMessage,
logger = LoggerFactory.getLogger(ChannelUtilTest::class.java),
nameOfWorkers = javaClass.simpleName,
maxWorkers = 10,
)
}
charPool.forEach {
inputMessageChannel.send(it)
}
inputMessageChannel.close()
job.join()
assertEquals(26, count.get())
}