我正在使用第三方库来提供解析服务(在我的情况下是用户代理解析),它不是一个线程安全的库,必须在单线程的基础上运行。我想编写一个线程安全的API,可以被多个线程调用,通过Futures API与它进行交互,因为库可能会引入一些潜在的阻塞(IO)。我还想在必要时提供背压,并在解析器无法赶上生产者时返回失败的未来。
它实际上可能是一个通用的要求/问题,如何与任何非线程安全的客户端/库(用户代理/地理位置解析器,像redis这样的数据库客户端,像流利的伐木工人收集器一样)与并发环境中的背压进行交互。
我想出了以下公式:
这是要走的路吗?有没有其他方法可以实现这一点,也许更简单?也许使用图形阶段?可以在没有问题模式和更少的代码的情况下完成吗?
3号中提到的actor是因为我不确定源队列是否是线程安全的?我希望它只是在文档中说明,但事实并非如此。网络上有多个版本,有些说明不是,有些说明是。
源队列一旦实现,是否可以线程安全地从不同的线程推送元素?
(代码可能无法编译,并且容易出现潜在的失败,并且仅用于此问题)
class UserAgentRepo(dbFilePath: String)(implicit actorRefFactory: ActorRefFactory) {
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
implicit val askTimeout = Timeout(5 seconds)
// API to parser - delegates the request to the back pressure actor
def parse(userAgent: String): Future[Option[UserAgentData]] = {
val p = Promise[Option[UserAgentData]]
parserBackPressureProvider ! UserAgentParseRequest(userAgent, p)
p.future
}
// Actor to provide back pressure that delegates requests to parser actor
private class ParserBackPressureProvider extends Actor {
private val parser = context.actorOf(Props[UserAgentParserActor])
val queue = Source.queue[UserAgentParseRequest](100, OverflowStrategy.dropNew)
.mapAsync(1)(request => (parser ? request.userAgent).mapTo[Option[UserAgentData]].map(_ -> request.p))
.to(Sink.foreach({
case (result, promise) => promise.success(result)
}))
.run()
override def receive: Receive = {
case request: UserAgentParseRequest => queue.offer(request).map {
case QueueOfferResult.Enqueued =>
case _ => request.p.failure(new RuntimeException("parser busy"))
}
}
}
// Actor parser
private class UserAgentParserActor extends Actor {
private val up = new UserAgentParser(dbFilePath, true, 50000)
override def receive: Receive = {
case userAgent: String =>
sender ! Try {
up.parseUa(userAgent)
}.toOption.map(UserAgentData(userAgent, _))
}
}
private case class UserAgentParseRequest(userAgent: String, p: Promise[Option[UserAgentData]])
private val parserBackPressureProvider = actorRefFactory.actorOf(Props[ParserBackPressureProvider])
}
你必须为此使用演员吗?
看起来你并不需要所有这些复杂性,scala / java拥有你需要的所有工具“开箱即用”:
class ParserFacade(parser: UserAgentParser, val capacity: Int = 100) {
private implicit val ec = ExecutionContext
.fromExecutor(
new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(capacity)
)
)
def parse(ua: String): Future[Option[UserAgentData]] = try {
Future(Some(UserAgentData(ua, parser.parseUa(ua)))
.recover { _ => None }
} catch {
case _: RejectedExecutionException =>
Future.failed(new RuntimeException("parser is busy"))
}
}