使用Futures,akka流和akka演员在并发环境中与非线程安全服务集成,同时保持背压

问题描述 投票:0回答:1

我正在使用第三方库来提供解析服务(在我的情况下是用户代理解析),它不是一个线程安全的库,必须在单线程的基础上运行。我想编写一个线程安全的API,可以被多个线程调用,通过Futures API与它进行交互,因为库可能会引入一些潜在的阻塞(IO)。我还想在必要时提供背压,并在解析器无法赶上生产者时返回失败的未来。

它实际上可能是一个通用的要求/问题,如何与任何非线程安全的客户端/库(用户代理/地理位置解析器,像redis这样的数据库客户端,像流利的伐木工人收集器一样)与并发环境中的背压进行交互。

我想出了以下公式:

  1. 将解析器封装在专用的Actor中。
  2. 创建一个接收ParseReuqest的akka​​流源队列,该队列包含用户代理和要完成的Promise,并通过mapAsync使用ask模式与解析器actor进行交互。
  3. 创建另一个actor来封装源队列。

这是要走的路吗?有没有其他方法可以实现这一点,也许更简单?也许使用图形阶段?可以在没有问题模式和更少的代码的情况下完成吗?

3号中提到的actor是因为我不确定源队列是否是线程安全的?我希望它只是在文档中说明,但事实并非如此。网络上有多个版本,有些说明不是,有些说明是。

源队列一旦实现,是否可以线程安全地从不同的线程推送元素?

(代码可能无法编译,并且容易出现潜在的失败,并且仅用于此问题)

class UserAgentRepo(dbFilePath: String)(implicit actorRefFactory: ActorRefFactory) {

import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
implicit val askTimeout = Timeout(5 seconds)

// API to parser - delegates the request to the back pressure actor
def parse(userAgent: String): Future[Option[UserAgentData]] = {
  val p = Promise[Option[UserAgentData]]
  parserBackPressureProvider ! UserAgentParseRequest(userAgent, p)
  p.future
}

// Actor to provide back pressure that delegates requests to parser actor
private class ParserBackPressureProvider extends Actor {
  private val parser = context.actorOf(Props[UserAgentParserActor])

  val queue = Source.queue[UserAgentParseRequest](100, OverflowStrategy.dropNew)
    .mapAsync(1)(request => (parser ? request.userAgent).mapTo[Option[UserAgentData]].map(_ -> request.p))
    .to(Sink.foreach({
      case (result, promise) => promise.success(result)
    }))
    .run()

  override def receive: Receive = {
    case request: UserAgentParseRequest => queue.offer(request).map {
      case QueueOfferResult.Enqueued =>
      case _ => request.p.failure(new RuntimeException("parser busy"))
    }
  }
}

// Actor parser
private class UserAgentParserActor extends Actor {
  private val up = new UserAgentParser(dbFilePath, true, 50000)
  override def receive: Receive = {
    case userAgent: String =>
      sender ! Try {
      up.parseUa(userAgent)
    }.toOption.map(UserAgentData(userAgent, _))
  }
}

private case class UserAgentParseRequest(userAgent: String, p: Promise[Option[UserAgentData]])

private val parserBackPressureProvider = actorRefFactory.actorOf(Props[ParserBackPressureProvider])

}
scala akka akka-stream
1个回答
0
投票

你必须为此使用演员吗?

看起来你并不需要所有这些复杂性,scala / java拥有你需要的所有工具“开箱即用”:

 class ParserFacade(parser: UserAgentParser, val capacity: Int = 100) {
    private implicit val ec = ExecutionContext
      .fromExecutor(
         new ThreadPoolExecutor(
           1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(capacity)
         )
      )

   def parse(ua: String): Future[Option[UserAgentData]] = try {
     Future(Some(UserAgentData(ua, parser.parseUa(ua)))
       .recover { _ => None }
   } catch {
     case _: RejectedExecutionException => 
       Future.failed(new RuntimeException("parser is busy"))
   }
}
© www.soinside.com 2019 - 2024. All rights reserved.