如何利用协程/completeableFuture<> spring boot应用程序来提高吞吐量

问题描述 投票:0回答:1

所以,我的服务层中有一个外部 Api 调用和 2 个繁重的数据库查询,阻塞代码如下所示

    override fun getTotalIncomeByMonth(retailerMsisdn: String, month: Int, year: Int): SummaryDto {


        val incomes1: TransactionSummaryDao? =
            service1.func(
                retailerMsisdn, month, year
            ) // a heavy query runs under the hood

        val income2: Double = service2.func(
            retailerMsisdn, month, year
        ) // another heavy query runs under the hood


        val totalSimSale: Double = externalApiService.apiCall(
            retailerMsisdn, month, year
        )  // okHttp api call 

        return SummaryDto().apply {
            this.incomeX = income1
            this.incomeY = income2
            this.simSale = simsale
        }
    }

我正在考虑创建一个线程池 bean 并使用执行器同时进行调用,但是这个线程池在应用程序之外将受到限制,我想到每个请求使用 2 个线程来处理这个场景,

    override fun getTotalIncomeByMonth(retailerMsisdn: String, month: Int, year: Int): SummaryDto {

        val executor = ThreadPoolTaskExecutor()
        executor.corePoolSize = 2
        executor.maxPoolSize = 2
        executor.setQueueCapacity(3)
        executor.setTaskDecorator(MdcTaskDecorator())
        executor.setThreadNamePrefix(Constant.NAME_ASYNC_THREAD_PREFIX)
        executor.setWaitForTasksToCompleteOnShutdown(true)
        executor.initialize()

        val incomes1: CompleteableFutur<Dao> =CompletableFuture.supplyAsync(
          {
            service1.func(
                retailerMsisdn, month, year
            ) // a heavy query runs under the hood (IO)
          }, executor )

        val income2: CompleteableFutur<Double>  = CompletableFuture.supplyAsync(
          {
            service2.func(
               retailerMsisdn, month, year
            ) // another heavy query runs under the hood ( IO)
          }, executor) 

        val totalSale =  
externalApiService.apiCall(
            retailerMsisdn, month, year
        )  // okHttp api call  (IO)

        return SummaryDto().apply {
            this.incomeX = income1.join()
            this.incomeY = income2.join()
            this.simSale = simsale.join()
        }
    }

这是一个很好的方法吗,还是我应该考虑使用 kotlin 协程和挂起函数?调用者线程可以在不使用额外的调度程序线程的情况下处理全部 3 个 IO 操作吗?我知道额外的线程创建成本很高,所以也要记住这一点。该应用程序的用户接近100万,每秒可以有1000个请求

考虑仅使用调用者线程来使用 kotlin 协程

override fun getTotalIncomeByMonth(retailerMsisdn: String, month: Int, year: Int): SummaryDto {
    return runBlocking {
        val income1: Dao? = async {
            service1.func(retailerMsisdn, month, year)
            // a heavy query runs under the hood
        }

        val income2: Double = async {
            service2.func(retailerMsisdn, month, year)
            // another heavy query runs under the hood
        }

        val simSale: Double = async {
            externalApiService.apiCall(retailerMsisdn, month, year)
            // OkHttp API call
        }

        SummaryDto().apply {
            this.incomeX = income1.await()
            this.incomeY = income2.await()
            this.simSale = simSale.await()
        }
    }
}
 
spring-boot kotlin concurrency coroutine completable-future
1个回答
0
投票

首先,请注意避免每次使用执行器方法时都创建线程池,这可能会导致成本高昂。

除此之外,在线程数量相同的情况下,使用协程也能发挥同样的作用,而且好处是更方便编写。 但是,您在协程版本中没有使用相同数量的线程。正如您所说,您仅使用调用者线程,这意味着一次最多可以执行一个协程(没有并行性,只有并发性)。反过来,这意味着一个繁重的 IO 将阻塞唯一的线程,并阻止其他线程运行,因此执行速度会比第一种方法慢(但内存较少)。

您可以使用预定义的

Dispatchers.IO
,而不是仅使用调用线程,并且您仍然可以使用
Dispatchers.IO.limitedParallelism(2)
来限制每个请求使用的线程数(如果您认为这是可取的):

override fun getTotalIncomeByMonth(retailerMsisdn: String, month: Int, year: Int): SummaryDto {
    return runBlocking {
        val income1: Dao? = async(Dispatchers.IO) {
            service1.func(retailerMsisdn, month, year)
            // a heavy query runs under the hood
        }

        val income2: Double = async(Dispatchers.IO) {
            service2.func(retailerMsisdn, month, year)
            // another heavy query runs under the hood
        }

        val simSale: Double = async {
            externalApiService.apiCall(retailerMsisdn, month, year)
            // OkHttp API call
        }

        SummaryDto().apply {
            this.incomeX = income1.await()
            this.incomeY = income2.await()
            this.simSale = simSale.await()
        }
    }
}

请注意,您可以通过不指定调度程序来选择仍然使用调用者线程。在上面的代码中,我在调用者线程下运行第三个调用(API 调用),就像在您的执行程序示例中一样,而另外 2 个调用则在 IO 调度程序上运行。我没有使用

limitedParallelism(2)
,因为如果我们只启动 2 个协程,那就多余了。

话虽如此,我不会在这里使用

runBlocking
。相反,您应该使函数本身成为
suspend
,并将其冒泡到 Spring 控制器(如果您使用 webflux,它们确实支持
suspend
函数)。

© www.soinside.com 2019 - 2024. All rights reserved.