val ref = Source.actorRef[String](
completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty,
100000,
OverflowStrategy.dropNew
).to(Sink.foreachAsync(1){ elem =>
// how to reply to sender
Future.successful()
})
上面的示例几乎满足了我的需要,但底层消息不知道发送者。所以无法回复。有没有一种方法或模式可以让我回复发件人,以便它可以与询问模式一起使用,例如:
import akka.pattern.ask
(ref ? "request").onComplete {
case Failure(exception) => logger.error(s"Couldn't receive response", exception)
case Success(value) => logger.info(s"Received response ${value}")
}
这对于经典的基于 actor 的 API 和
Source.actorRef
来说是不可能的,正如您所注意到的,它会丢弃发送者。
但是,如果使用 Akka 2.6+,可以将类型化 ask 模式 与
akka.actor.typed.scaladsl.adapter.ClassicActorRefOps
结合使用,以将发送 actor(在本例中为询问的合成 actor)包含在发送到 Source.actorRef
的消息中。
对于您的示例,您可以将流重写为:
import akka.actor.typed.{ ActorRef => TypedActorRef }
val ref = Source.actorRef[(String, TypedActorRef[Any])](
completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty,
100000,
OverflowStrategy.dropNew
).to(Sink.foreach { // Sink.foreachAsync(1) isn't doing anything here...
case (elem, sender) =>
// do stuff with elem?
sender ! "response"
}
然后询问
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Scheduler
import akka.actor.typed.scaladsl.adapter.{ ClassicActorRefOps, ClassicActorSystemOps }
// actorSystem is the classic ActorSystem in use
implicit val scheduler: Scheduler = actorSystem.toTyped.scheduler
ref.ask[Any]("request" -> _).onComplete {
case Failure(exception) => logger.error(s"Couldn't receive response", exception)
case Success(value) => logger.info(s"Received response ${value}")
}