从Supervisor重启后向actor发送消息

问题描述 投票:4回答:4

我正在使用BackoffSupervisor策略来创建一个必须处理某些消息的子actor。我想实现一个非常简单的重启策略,在异常的情况下:

  1. 孩子向主管宣传失败的消息
  2. 主管重新启动子节点并再次发送失败的消息。
  3. 主管在3次重试后放弃
  4. Akka持久性不是一种选择

到目前为止我所拥有的是:

主管定义:

val childProps = Props(new SenderActor())
val supervisor = BackoffSupervisor.props(
  Backoff.onFailure(
    childProps,
    childName = cmd.hashCode.toString,
    minBackoff = 1.seconds,
    maxBackoff = 2.seconds,
    randomFactor = 0.2 
  )
    .withSupervisorStrategy(
      OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
        case msg: MessageException => {
          println("caught specific message!")
          SupervisorStrategy.Restart
        }
        case _: Exception => SupervisorStrategy.Restart
        case _              ⇒ SupervisorStrategy.Escalate
      })
)

val sup = context.actorOf(supervisor)


sup ! cmd

应该发送电子邮件的子actor,但是失败(抛出一些Exception)并将Exception传播回supervisor:

class SenderActor() extends Actor {

  def fakeSendMail():Unit =  {
    Thread.sleep(1000)
    throw new Exception("surprising exception")
  } 

  override def receive: Receive = {
    case cmd: NewMail =>

      println("new mail received routee")
      try {
        fakeSendMail()
      } catch {
        case t => throw MessageException(cmd, t)
      }

  }
}

在上面的代码中,我将任何异常包装到自定义类MessageException中,该类传播到SupervisorStrategy,但是如何将它进一步传播给新子进程以强制重新处理?这是正确的方法吗?

编辑。我试图在preRestart钩子上向Actor重新发送消息,但不知何故钩子没有被触发:

class SenderActor() extends Actor {

  def fakeSendMail():Unit =  {
    Thread.sleep(1000)
    //    println("mail sent!")
    throw new Exception("surprising exception")
  }

  override def preStart(): Unit = {
    println("child starting")
  }


  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    reason match {
      case m: MessageException => {
        println("aaaaa")
        message.foreach(self ! _)
      }
      case _ => println("bbbb")
    }
  }

  override def postStop(): Unit = {
    println("child stopping")
  }

  override def receive: Receive = {
    case cmd: NewMail =>

      println("new mail received routee")
      try {
        fakeSendMail()
      } catch {
        case t => throw MessageException(cmd, t)
      }

  }
}

这给了我类似于以下输出的东西:

new mail received routee
caught specific message!
child stopping
[ERROR] [01/26/2018 10:15:35.690]
[example-akka.actor.default-dispatcher-2]
[akka://example/user/persistentActor-4-scala/$a/1962829645] Could not
process message sample.persistence.MessageException:
Could not process message <stacktrace>
child starting

但没有来自preRestart挂钩的日志

scala akka actor
4个回答
5
投票

未调用子项的preRestart挂钩的原因是因为Backoff.onFailure在封面下使用BackoffOnRestartSupervisor,它使用与退避策略一致的停止和延迟启动行为替换默认重启行为。换句话说,当使用Backoff.onFailure时,当孩子重新启动时,孩子的preRestart方法不会被调用,因为基础主管实际上停止了孩子,然后再次启动它。 (使用Backoff.onStop可以触发孩子的preRestart钩子,但这与目前的讨论相关。)

当主管的子进程重新启动时,BackoffSupervisor API不支持自动重新发送消息:您必须自己实现此行为。重试消息的想法是让BackoffSupervisor的主管处理它。例如:

val supervisor = BackoffSupervisor.props(
  Backoff.onFailure(
    ...
  ).withReplyWhileStopped(ChildIsStopped)
  ).withSupervisorStrategy(
    OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
      case msg: MessageException =>
        println("caught specific message!")
        self ! Error(msg.cmd) // replace cmd with whatever the property name is
        SupervisorStrategy.Restart
      case ...
    })
)

val sup = context.actorOf(supervisor)

def receive = {
  case cmd: NewMail =>
    sup ! cmd
  case Error(cmd) =>
    timers.startSingleTimer(cmd.id, Replay(cmd), 10.seconds)
    // We assume that NewMail has an id field. Also, adjust the time as needed.
  case Replay(cmd) =>
    sup ! cmd
  case ChildIsStopped =>
    println("child is stopped")
}

在上面的代码中,嵌入在NewMail中的MessageException消息被包装在一个自定义的case类中(为了很容易将它与“normal”/ new NewMail消息区分开来)并发送到self。在这种背景下,self是创造BackoffSupervisor的演员。这个封闭的演员然后使用single timer在某个时刻重播原始消息。这个时间点应该足够远,以至于BackoffSupervisor可能会耗尽SenderActor的重新启动尝试,以便孩子在收到重新发送的消息之前可以有充分的机会进入“良好”状态。显然,无论子重启次数如何,此示例仅涉及一次重发消息。


另一个想法是为每个BackoffSupervisor消息创建一个SenderActor-NewMail对,并让SenderActorNewMail钩子中将preStart消息发送给自己。这种方法的一个问题是清理资源;即,当处理成功或孩子重新开始时,关闭BackoffSupervisors(这将关闭他们各自的SenderActor孩子)。 NewMail ids到(ActorRef, Int)元组的地图(其中ActorRef是对BackoffSupervisor演员的引用,而Int是重启尝试的次数)在这种情况下会有所帮助:

class Overlord extends Actor {

  var state = Map[Long, (ActorRef, Int)]() // assuming the mail id is a Long

  def receive = {
    case cmd: NewMail =>
      val childProps = Props(new SenderActor(cmd, self))
      val supervisor = BackoffSupervisor.props(
        Backoff.onFailure(
          ...
        ).withSupervisorStrategy(
          OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
            case msg: MessageException =>
              println("caught specific message!")
              self ! Error(msg.cmd)
              SupervisorStrategy.Restart
            case ...
          })
      )
      val sup = context.actorOf(supervisor)
      state += (cmd.id -> (sup, 0))

    case ProcessingDone(cmdId) =>
      state.get(cmdId) match {
        case Some((backoffSup, _)) =>
          context.stop(backoffSup)
          state -= cmdId
        case None =>
          println(s"${cmdId} not found")
      }

    case Error(cmd) =>
       val cmdId = cmd.id
       state.get(cmdId) match {
         case Some((backoffSup, numRetries)) =>
           if (numRetries == 3) {
             println(s"${cmdId} has already been retried 3 times. Giving up.")
             context.stop(backoffSup)
             state -= cmdId
           } else
             state += (cmdId -> (backoffSup, numRetries + 1))
         case None =>
           println(s"${cmdId} not found")
       }

    case ...
  }
}

请注意,上例中的SenderActorNewMailActorRef作为构造函数参数。后一个参数允许SenderActor向封闭的actor发送自定义的ProcessingDone消息:

class SenderActor(cmd: NewMail, target: ActorRef) extends Actor {
  override def preStart(): Unit = {
    println(s"child starting, sending ${cmd} to self")
    self ! cmd
  }

  def fakeSendMail(): Unit = ...

  def receive = {
    case cmd: NewMail => ...
  }
}

显然,SenderActor在每次使用fakeSendMail时都会失败。我将留下SenderActor所需的其他更改来实现快乐路径,其中SenderActorProcessingDone发送target消息给你。


0
投票

失败的子actor可以作为主管策略中的发件人使用。引用https://doc.akka.io/docs/akka/current/fault-tolerance.html#creating-a-supervisor-strategy

如果策略是在监督actor内声明的(而不是在伴随对象中),则其决策者可以以线程安全的方式访问actor的所有内部状态,包括获取对当前失败的子级的引用(作为发送者可用)失败的消息)。


0
投票

在@chunjef提供的良好解决方案中,他警告在退避监督员启动工作人员之前重新安排工作的风险

然后,这个封闭的actor使用单个计时器在某个时刻重放原始消息。此时间点应该足够远,以便BackoffSupervisor可能会耗尽SenderActor的重启尝试,以便孩子在收到重新发送的消息之前可以有充分的机会进入“良好”状态。

如果发生这种情况,该方案将成为死信,并且不会进一步取得进展。我用this scenario简化了小提琴。

因此,计划延迟应该大于maxBackoff,这可能代表工作完成时间的影响。避免这种情况的一种可能的解决方案是让工人演员在准备工作时向父亲发送消息,例如here


-1
投票

在您的情况下,使用某些第三方软件发送电子邮件是一项危险的操作。为什么不应用Circuit Breaker模式并完全跳过发送者演员?此外,你仍然可以在其中有一个演员(有一些Backoff Supervisor)和Circuit Breaker(如果这对你有意义的话)。

© www.soinside.com 2019 - 2024. All rights reserved.