我们在应用程序中使用Akka演员。参与者正在调用服务,在某些情况下,这些服务正在调用第三方公开的其余API。现在,第三方API需要很长时间才能做出响应。因此,在高峰时间,由于线程在等待客户端API响应时被阻塞,因此系统吞吐量会受到影响。
有时在高峰时间,因为线程正在等待,所以一旦阻塞的线程可用,邮件就会在akka邮箱中长时间等待并被接收。
我正在寻找一种解决方案,可以提高系统的吞吐量并释放线程,以便参与者消息可以开始处理。
我正在考虑使用Future将其余的API调用从阻塞调用更改为非阻塞的调用,并为此类其余的API调用创建专用的参与者。 Actor会定期检查future是否完成,然后发送完成消息,然后其余过程可以继续。这样,阻塞线程的数量将减少,可用于actor消息处理。
另外,对于正在执行阻塞操作的参与者,也希望具有单独的执行上下文。到目前为止,我们已经有了全局执行上下文。
需要进一步的输入。
首先,正确的是,在处理消息时不应阻塞,因此您需要启动异步进程并立即从消息处理程序中返回。然后,Actor可以在过程完成时发送另一条消息。
[您没有说您正在使用哪种技术来进行REST API调用,但是Akka HTTP之类的库在发出HTTP请求时将返回Future
,因此您无需在其周围添加自己的Future
。例如:
def getStatus(): Future[Status] =
Http().singleRequest(HttpRequest(uri = "http://mysite/status")).flatMap {
res =>
Unmarshal(res).to[Status]
}
如果由于某种原因正在使用同步库,请将调用包装在Future(blocking{ ??? })
中,以通知执行上下文需要创建新线程。除非需要控制此操作的线程调度,否则不需要单独的执行上下文。
您也不需要轮询Future
,只需添加一个onComplete
处理程序,该处理程序会将内部消息发送回您的actor,然后由其处理API调用的结果。