什么是Akka类型的pipeTo?

问题描述 投票:1回答:3

我目前正在尝试将现有的无类型演员重写为键入的演员。由于演员正在使用ScalikeJDBC与MySQL数据库交谈,并且由于我希望异步完成,我正在处理来自单独(非演员)存储库类的Futures。

对于无类型的Akka,在演员的接收方法中,我可以这样做:

import akka.pattern.pipe
val horseList : Future[Seq[Horse]] = horseRepository.listHorses(...)
horseList pipeTo sender()

发送者演员最终会收到一份马匹清单。我无法弄清楚如何在行为中执行此操作,例如:

val behaviour : Behavior[ListHorses] = Behaviors.receive { 
    (ctx,msg) => msg match {
        case ListHorses(replyTo) => 
            val horseListF : Future[Seq[Horse]] = horseRepository.listHorses(...)
            // -> how do I make horseListF's content end up at replyTo? <-
            Behaviors.same
    }
}

管道模式不起作用(因为它需要一个无类型的ActorRef),到目前为止,我还没有在akka-actor-typed(2.5.12)依赖项中找到任何其他东西,我正在使用它来完成这项工作。

我该怎么做呢?

scala akka akka-typed
3个回答
0
投票

为什么不使用未来的回调来发送回信息?检查一下这个例子,也许你可以使用simliar近似:

import akka.NotUsed
import akka.typed.{ActorRef, ActorSystem, Behavior}
import akka.typed.scaladsl.Actor

import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import scala.concurrent.ExecutionContext.Implicits.global

sealed trait Response
case class Message(msg: String) extends Response

case class Greet(whom: String, replayTo: ActorRef[Response])

object Methods {
  def GetRecipient : Future[String] = Future { "Me" }
}

object Greeter {
  import Methods._
  import akka.typed.scaladsl.Actor

  val behavior =
    Actor.immutable[Greet] { (ctx, greet) =>
      println(greet)
      GetRecipient onComplete {
        case Success(str) => {
          // Use the future call back instad the pipeTo
          greet.replayTo ! Message("Hi!")
        }
        case Failure(err) => greet.replayTo ! Message("Error")
      }
      Actor.same
    }
}

object Man extends App {

  import Greeter._
  import scala.concurrent.duration._

  val main: Behavior[Response] = {
    Actor.deferred[Response] { ctx =>
      val enricherRef = ctx.spawn(behavior, "greet")
      enricherRef ! Greet("hey", ctx.self)

      Actor.immutable[Response] {
        case (ctx, m: Response) => {
          println(m)
          Actor.same
        }
      }
    }
  }

  val system = ActorSystem( "GreetDemo", main)

  Thread.sleep(5000)
}

此示例仅向新生成的actor发送消息,但在您的情况下,我会为每个查询使用新的actor。


0
投票

您可以在未来成功完成时向replyTo发送消息:

case ListHorses(replyTo) => 
    horseRepository.listHorses(...) foreach { horses => replyTo ! horses }
    Behaviors.same

或者,如果您还想处理错误:

case ListHorses(replyTo) =>
    horseRepository.listHorses(...) onComplete { 
        case Success(horses) => replyTo ! horses
        case Failure(e) => // error handling 
    }
    Behaviors.same

为了使这个工作,你需要一个ExecutionContext。通常使用与actor相同的一个是有意义的,所以你必须首先将它提供给onCompleteforeach

implicit val ec = ctx.executionContext

0
投票

在Akka 2.5.22(也许更早)有context.pipeToSelf

  def pipeToSelf[Value](future: Future[Value])(mapResult: Try[Value] => T): Unit

你仍然需要为SuccessFailure提供一个模式匹配,在我的代码中我用这个糖减少了:

def mapPipe[A, T](success: A => T, failure: Throwable => T): Try[A] => T = {
  case Success(value) => success(value)
  case Failure(e) => failure(e)
}

导致这样的通话:

case class Horses(horses: Seq[Horse]) extends Command
case class HorseFailure(e: Throwable) extends Command

...

context.pipeToSelf(horseList) {
  mapPipe(Horses,HorseFailure)
}
© www.soinside.com 2019 - 2024. All rights reserved.