考虑以下代码:
def getFlow()(implicit appConfig: AppConfig, actorSystem: ActorSystem): Flow[BrandSafetyServiceRequest, HttpResponse, NotUsed] = {
implicit val brandSafetyServiceRequestCodec: JsonValueCodec[BrandSafetyServiceRequest] = JsonCodecMaker.make
implicit val dispatcher: ExecutionContextExecutor = actorSystem.dispatcher
val parallelism = Runtime.getRuntime.availableProcessors()
val flow = Flow[BrandSafetyServiceRequest].mapAsyncUnordered(parallelism) { request =>
Http().singleRequest(
Post(appConfig.brandSafetyPhoenixServiceEndpoint).withEntity(HttpEntity(MediaTypes.`application/json`, writeToString(request)))
)
}
val backpressure = RetryFlow.withBackoff(minBackoff = 15.seconds, maxBackoff = 30.minutes, randomFactor = 0d, maxRetries = 5, flow)(
decideRetry = {
case (request, HttpResponse(statusCode, _, _, _)) =>
statusCode match {
case StatusCodes.OK | StatusCodes.BadRequest => None
case _ => Some(request)
}
}
)
Flow[BrandSafetyServiceRequest].via(backpressure)
}
我想优化 CPU 利用率并最大化吞吐量。目前,我没有覆盖默认调度程序或任何 HTTP Akka 配置。我正在使用 Runtime.getRuntime.availableProcessors() 的并行性。
源是有限的,而汇则什么也不做。目前,我使用了大约 1/8 的机器 CPU。
问题是:
Slick.source("databricks")
的请求。如果您可以更新代码,那就太好了。
谢谢!
注意:
appConfig.brandSafetyPhoenixServiceEndpoint
是一个带有路径的 Kubernetes DNS,当后端不可用时,我正在等待对背压的 HTTP 响应,这就是我等待响应的原因
正如 Gael 所指出的,您的流是 I/O 绑定的(无论是从 Slick 源获取数据还是发送 HTTP 请求和等待响应)。
将并行度绑定到可用处理器(或者实际上比该处理器少 1 或 2 个)的建议仅适用于 CPU 密集型任务;这似乎是一个允许比这更大的并行性的应用程序。
至于并行度提高多少,一种方法是引入并行度因子来乘以可用处理器。迭代地进行基准测试,如果一次运行显示利用率为 1/n,则将前一个因子乘以 n;继续这样做,直到 CPU 利用率的提高表现出并行性增加带来的回报明显减少(这种回报减少表明其他因素是瓶颈:基于流的遥测(例如我雇主的产品)在这里可能会有所帮助;合理的启发式达到收益递减点可能类似于“将并行度增加 x 倍,将吞吐量增加到小于 sqrt(x) 倍”)。
作为旁注,我希望您了解
RestartFlow
关于失败的语义(特别是对于缓冲的流,如 mapAsync
系列运算符所做的那样):已传递给包装流的元素将不会如果包装流失败则重试。换句话说,您的流很可能最多处理一次来自 Slick 源的元素。