优化 HTTP 上 Akka / Pekko 流的 CPU 利用率和吞吐量

问题描述 投票:0回答:1

考虑以下代码:

def getFlow()(implicit appConfig: AppConfig, actorSystem: ActorSystem): Flow[BrandSafetyServiceRequest, HttpResponse, NotUsed] = {

    implicit val brandSafetyServiceRequestCodec: JsonValueCodec[BrandSafetyServiceRequest] = JsonCodecMaker.make
    implicit val dispatcher: ExecutionContextExecutor = actorSystem.dispatcher

    val parallelism = Runtime.getRuntime.availableProcessors()
    val flow = Flow[BrandSafetyServiceRequest].mapAsyncUnordered(parallelism) { request =>
      Http().singleRequest(
        Post(appConfig.brandSafetyPhoenixServiceEndpoint).withEntity(HttpEntity(MediaTypes.`application/json`, writeToString(request)))
      )
    }

    val backpressure = RetryFlow.withBackoff(minBackoff = 15.seconds, maxBackoff = 30.minutes, randomFactor = 0d, maxRetries = 5, flow)(
      decideRetry = {
        case (request, HttpResponse(statusCode, _, _, _)) =>
          statusCode match {
            case StatusCodes.OK | StatusCodes.BadRequest => None
            case _ => Some(request)
          }
      }
    )

    Flow[BrandSafetyServiceRequest].via(backpressure)
  }

我想优化 CPU 利用率并最大化吞吐量。目前,我没有覆盖默认调度程序或任何 HTTP Akka 配置。我正在使用 Runtime.getRuntime.availableProcessors() 的并行性。

源是有限的,而汇则什么也不做。目前,我使用了大约 1/8 的机器 CPU。

问题是:

  1. 是否可以选择不使用具有并行性的低级 API,而仅使用 Akka 配置?这是最佳实践吗?
  2. 如果不是,我应该使用什么并行度?该应用程序仅执行此流程,根本不占用 CPU 资源,只是等待响应并转发来自
    Slick.source("databricks")
    的请求。
  3. 我应该使用哪种 Akka 配置?也许增加主机连接?使用其他调度员?

如果您可以更新代码,那就太好了。
谢谢!

注意:

appConfig.brandSafetyPhoenixServiceEndpoint
是一个带有路径的 Kubernetes DNS,当后端不可用时,我正在等待对背压的 HTTP 响应,这就是我等待响应的原因

scala performance akka slick pekko
1个回答
0
投票

正如 Gael 所指出的,您的流是 I/O 绑定的(无论是从 Slick 源获取数据还是发送 HTTP 请求和等待响应)。

将并行度绑定到可用处理器(或者实际上比该处理器少 1 或 2 个)的建议仅适用于 CPU 密集型任务;这似乎是一个允许比这更大的并行性的应用程序。

至于并行度提高多少,一种方法是引入并行度因子来乘以可用处理器。迭代地进行基准测试,如果一次运行显示利用率为 1/n,则将前一个因子乘以 n;继续这样做,直到 CPU 利用率的提高表现出并行性增加带来的回报明显减少(这种回报减少表明其他因素是瓶颈:基于流的遥测(例如我雇主的产品)在这里可能会有所帮助;合理的启发式达到收益递减点可能类似于“将并行度增加 x 倍,将吞吐量增加到小于 sqrt(x) 倍”)。

作为旁注,我希望您了解

RestartFlow
关于失败的语义(特别是对于缓冲的流,如
mapAsync
系列运算符所做的那样):已传递给包装流的元素将不会如果包装流失败则重试。换句话说,您的流很可能最多处理一次来自 Slick 源的元素。

© www.soinside.com 2019 - 2024. All rights reserved.