演员模型:一个演员需要来自另一个演员的信息

问题描述 投票:0回答:1

关于演员模型我不了解的是这个。假设你有两个演员。他们收集和管理来自各种来源的数据。这些源通过其收件箱/邮箱/队列与演员互动。例如,演员A收集信号,而演员B管理整个系统的信息。

在某些时候,演员A必须处理其数据。在处理过程中,它不能做太多其他事情。例如,无法处理新信号。作为数据处理的一部分,在A可以继续之前,演员A需要来自演员B的信息。

在伪代码中:

functionOfActorA()
{
  internalQueue {
   ...doing stuff with our data
   info = actor_B.getInfo() -> what should happen here?
   ...doing stuf with our data and the obtained info
   }
}

getInfo()//function of actor B
{
  internalQueue {
    ...prepare requested info
    ... -> what should happen here?
  }
} 

如果两个actor都应该在自己的队列或线程上独立运行,那么如何根据actor的请求从一个actor到另一个actor获取信息?

language-agnostic actor
1个回答
3
投票

Actor A可以通过向Actor B发送请求来请求信息,但是响应将返回到一个消息中,Actor A将在以后的某个时间点接收该消息。保存此信息有两种方法:

  1. Actor A可以在内部存储信息以等待该响应。
  2. Actor A可以将信息附加到它发送给Actor B的消息,而Actor B返回该上下文(否则忽略)。

当Actor A从Actor B获取信息时,它可以继续处理。

这是第一种实现方式的示例,使用Thespian Python actor库(http://thespianpy.com):

class ActorA(Actor):

    def __init__(self, *args, **kw):
        super(ActorA, self).__init__(args, kw)
        self.datalist = []

    def receiveMessage(self, msg, sender):

        if isinstance(msg, ActorAddress):
            self.actorB = msg

        elif isinstance(msg, WorkRequest):
            x = got_some_data(msg)
            self.datalist.append(x)
            self.send(self.actorB, NeedInfo())

        elif isinstance(msg, Info):
            x = self.datalist.pop()
            process_data(x, msg)

class ActorB(Actor):

    def receiveMessage(self, msg, sender):

        if isinstance(msg, NeedInfo):
            i = get_info(msg)
            self.send(sender, i)

上面显示了将工作内部存储到actor以后续继续的基本功能。有几个注意事项:

  • 如果要存储多个数据项,则ActorA需要某种方式来确定来自ActorB的信息适用于哪个项目。
  • ActorA需要处理它还不知道ActorB的地址的情况。
  • ActorA应该使用某种超时来避免永远保持其内部列表的工作(如果某种方式,ActorB永远不会响应)。
  • 如果ActorA退出或死亡,它应该在退出之前对仍在datalist上的项执行适当的操作。

这是第二个实现样式的相应简单示例:

class ActorA(Actor):

    def receiveMessage(self, msg, sender):

        if isinstance(msg, ActorAddress):
            self.actorB = msg

        elif isinstance(msg, WorkRequest):
            x = got_some_data(msg)
            self.send(self.actorB, NeedInfo(x))

        elif isinstance(msg, Info):
            x = msg.orig_request.data
            process_data(x, msg)

class ActorB(Actor):

    def receiveMessage(self, msg, sender):

        if isinstance(msg, NeedInfo):
            i = getinfo(msg)
            i.orig_request = msg
            self.send(sender, i)

在第二个示例中,创建NeedInfo的调用将数据存储在NeedInfo对象的.data字段中,并且由ActorB传回的Info具有附加到其上的原始NeedInfo消息。这种风格的考虑因素是:

  • 无需将原始WorkRequest消息独立地关联到相应的Info,因为它们彼此连接。
  • ActorA更依赖于ActorB的实现,期望ActorB将原始消息附加到其响应,以允许ActorA恢复此上下文。
  • ActorA仍然需要处理它还不知道ActorB的地址的情况。
  • 如果ActorA退出或死亡,它在清理之前没有记录这个未完成的工作(尽管死信处理程序可以执行此角色)。

上面的例子相当简单,其他Actor Model库实现会有一些变化,但这些一般技术应该广泛适用,无论库/语言如何。这两种实现方式都遵循以下的一般Actor指南:

  1. 收到消息后,可以进行哪些工作
  2. 根据需要更新内部状态以处理下一条消息
  3. 退出消息处理程序

虽然可以编写执行阻塞操作的Actor,但该操作会干扰Actor的响应能力和处理其他消息的能力(正如您正确指出的那样),因此尽可能使用消息驱动的延续而不是阻塞调用。

© www.soinside.com 2019 - 2024. All rights reserved.