我正在开发一个小型参与者系统,当参与者层次结构中出现问题时,我希望关闭所有参与者并重新启动顶层参与者,因此一切都从干净的状态开始。
我可以很容易地做到这一点,在中间参与者中实施监督策略来升级错误,然后让顶级参与者重新启动。然而,正如此视频中所述,重新启动不会清除演员邮箱,我想这样做以真正确保一切重新开始干净。
到目前为止,我还没有找到任何有关如何清除邮箱的文档,所以不确定是否可能。
但是,由于我不想重新启动,而是有一些延迟,所以我研究了
BackoffSupervisor
。据我了解,当一个actor被BackoffSupervisor
重新启动时,它实际上并没有重新启动,而是在退避期间停止,然后重新启动,这意味着它的邮箱实际上被清除了,正如我想要的那样。这个解决方案对我有用,但我想确保这种行为是故意的,并且我可以在这个用例中依赖它。
可能不是超级相关,但仅供参考,以防万一,这就是我创建这个主管的方式:
return BackoffSupervisor.Props(
Backoff.OnFailure(
rootActorProps,
childName: nameof(RootActor),
minBackoff: TimeSpan.FromSeconds(1),
maxBackoff: TimeSpan.FromMinutes(5),
randomFactor: 0.2,
-1));
所以,总而言之,我的疑问是:
BackoffSupervisor
一种有效的方法来获取邮箱,并且其他一切都以干净的状态重新启动这是我编写的 LINQPad 脚本 - 请忽略
ActorSytem
终止内容;我只是需要它来确保程序停止运行;)
async Task Main()
{
// Initialize Actor System
var system = ActorSystem.Create("MySystem");
// Create Props for the EchoActor
var props = Props.Create<EchoActor>();
// Create the EchoActor
var echoActor = system.ActorOf(props, "echoActor");
// Send 10 messages to the EchoActor
for (int i = 0; i < 10; i++)
{
echoActor.Tell($"Message {i + 1}");
}
// Allow some time for messages to process before shutting down
await system.WhenTerminated;
}
// Define the EchoActor class
public class EchoActor : ReceiveActor
{
public EchoActor()
{
NormalBehavior();
}
private void NormalBehavior()
{
ReceiveAnyAsync(async message =>
{
await Task.Delay(100);
Console.WriteLine($"Received and echoing back: {message}");
throw new ApplicationException("restart");
});
}
private void RecoverBehavior(){
Receive<RecoverMarker>(_ =>{
Become(NormalBehavior);
Context.System.Terminate();
});
ReceiveAny(_ =>{
// discard
Console.WriteLine($"Discarding: {_}");
});
}
private class RecoverMarker{}
protected override void PreRestart(Exception ex, object message){
Self.Tell(new RecoverMarker());
}
protected override void PostRestart(Exception ex){
Become(RecoverBehavior);
}
}
此示例的工作原理:它使用行为切换将 Actor 在重新启动后立即移至“丢弃”模式,并且在销毁 Actor 的旧实例时,将“重启标记”提前推送到 Actor 的邮箱中。演员将丢弃所有较旧的消息,直到它达到
RestartMarker
。
这是解决此问题的通用方法,无需接触内部邮箱 API 等即可工作。