所以,我的服务层中有一个外部 Api 调用和 2 个繁重的数据库查询,阻塞代码如下所示
override fun getTotalIncomeByMonth(retailerMsisdn: String, month: Int, year: Int): SummaryDto {
val incomes1: TransactionSummaryDao? =
service1.func(
retailerMsisdn, month, year
) // a heavy query runs under the hood
val income2: Double = service2.func(
retailerMsisdn, month, year
) // another heavy query runs under the hood
val totalSimSale: Double = externalApiService.apiCall(
retailerMsisdn, month, year
) // okHttp api call
return SummaryDto().apply {
this.incomeX = income1
this.incomeY = income2
this.simSale = simsale
}
}
我正在考虑创建一个线程池 bean 并使用执行器同时进行调用,但是这个线程池在应用程序之外将受到限制,我想到每个请求使用 2 个线程来处理这个场景,
override fun getTotalIncomeByMonth(retailerMsisdn: String, month: Int, year: Int): SummaryDto {
val executor = ThreadPoolTaskExecutor()
executor.corePoolSize = 2
executor.maxPoolSize = 2
executor.setQueueCapacity(3)
executor.setTaskDecorator(MdcTaskDecorator())
executor.setThreadNamePrefix(Constant.NAME_ASYNC_THREAD_PREFIX)
executor.setWaitForTasksToCompleteOnShutdown(true)
executor.initialize()
val incomes1: CompleteableFutur<Dao> =CompletableFuture.supplyAsync(
{
service1.func(
retailerMsisdn, month, year
) // a heavy query runs under the hood (IO)
}, executor )
val income2: CompleteableFutur<Double> = CompletableFuture.supplyAsync(
{
service2.func(
retailerMsisdn, month, year
) // another heavy query runs under the hood ( IO)
}, executor)
val totalSale =
externalApiService.apiCall(
retailerMsisdn, month, year
) // okHttp api call (IO)
return SummaryDto().apply {
this.incomeX = income1.join()
this.incomeY = income2.join()
this.simSale = simsale.join()
}
}
这是一个很好的方法吗,还是我应该考虑使用 kotlin 协程和挂起函数?调用者线程可以在不使用额外的调度程序线程的情况下处理全部 3 个 IO 操作吗?我知道额外的线程创建成本很高,所以也要记住这一点。该应用程序的用户接近100万,每秒可以有1000个请求
考虑仅使用调用者线程来使用 kotlin 协程
override fun getTotalIncomeByMonth(retailerMsisdn: String, month: Int, year: Int): SummaryDto {
return runBlocking {
val income1: Dao? = async {
service1.func(retailerMsisdn, month, year)
// a heavy query runs under the hood
}
val income2: Double = async {
service2.func(retailerMsisdn, month, year)
// another heavy query runs under the hood
}
val simSale: Double = async {
externalApiService.apiCall(retailerMsisdn, month, year)
// OkHttp API call
}
SummaryDto().apply {
this.incomeX = income1.await()
this.incomeY = income2.await()
this.simSale = simSale.await()
}
}
}
首先,请注意避免每次使用执行器方法时都创建线程池,这可能会导致成本高昂。
除此之外,在线程数量相同的情况下,使用协程也能发挥同样的作用,而且好处是更方便编写。 但是,您在协程版本中没有使用相同数量的线程。正如您所说,您仅使用调用者线程,这意味着一次最多可以执行一个协程(没有并行性,只有并发性)。反过来,这意味着一个繁重的 IO 将阻塞唯一的线程,并阻止其他线程运行,因此执行速度会比第一种方法慢(但内存较少)。
您可以使用预定义的
Dispatchers.IO
,而不是仅使用调用线程,并且您仍然可以使用 Dispatchers.IO.limitedParallelism(2)
来限制每个请求使用的线程数(如果您认为这是可取的):
override fun getTotalIncomeByMonth(retailerMsisdn: String, month: Int, year: Int): SummaryDto {
return runBlocking {
val income1: Dao? = async(Dispatchers.IO) {
service1.func(retailerMsisdn, month, year)
// a heavy query runs under the hood
}
val income2: Double = async(Dispatchers.IO) {
service2.func(retailerMsisdn, month, year)
// another heavy query runs under the hood
}
val simSale: Double = async {
externalApiService.apiCall(retailerMsisdn, month, year)
// OkHttp API call
}
SummaryDto().apply {
this.incomeX = income1.await()
this.incomeY = income2.await()
this.simSale = simSale.await()
}
}
}
请注意,您可以通过不指定调度程序来选择仍然使用调用者线程。在上面的代码中,我在调用者线程下运行第三个调用(API 调用),就像在您的执行程序示例中一样,而另外 2 个调用则在 IO 调度程序上运行。我没有使用
limitedParallelism(2)
,因为如果我们只启动 2 个协程,那就多余了。
话虽如此,我不会在这里使用
runBlocking
。相反,您应该使函数本身成为 suspend
,并将其冒泡到 Spring 控制器(如果您使用 webflux,它们确实支持 suspend
函数)。