我有一个使用6个角色的C#Akka.net项目。一个参与者(LoggingActor)负责获取对象并将其传输到Web服务器。
[每当LoggingActor遇到网络故障时,似乎都会困扰我的整个actor系统;一切停滞不前,直到LoggingActor完成其方法。
LoggingActor中的阻止方法包含以下内容:
try
{
var rs = new RemoteServer();
var rsr = rs.SendMovement(editedMovement, _regionConfig.SiteToken, _userConfig.ServerUrl)
.Result;
if (rsr != HttpStatusCode.OK)
{
Log("Movement POST failed:" + rsr, LogLevel.Error, "LoggingActor");
}
}
catch (HttpRequestException httpEx)
{
Log("Buffering movement for re-send", LogLevel.Warn, "LoggingActor");
MovementTransmitBuffer.Add(editedMovement);
}
并且函数SendMovement看起来像这样:
public async Task<HttpStatusCode> SendMovement(Movement movement, string siteToken, string serverUrl)
{
//Get remote server
var createMovementUrl = serverUrl + MovementsRoute + "?site_token=" + siteToken;
var jsonString = "{ \"movement\": { \"approach\": \"" + movement.Approach + "\", \"exit\": \"" + movement.Exit + "\", \"turntype\": \"" + movement.TurnType + "\", \"objectclass\": \"" + movement.TrafficObjectType + "\", \"created_at\": " + movement.Timestamp.Ticks + "} }";
var content = new StringContent(jsonString, Encoding.UTF8, "application/json");
//Transmit
var response = await Client.PostAsync(createMovementUrl, content);
return response.StatusCode;
}
LoggingActor的目的是尝试将我的对象发布到服务器,并在初始尝试失败时缓冲它们以便稍后发送。对于我的应用程序,它们以什么顺序传输都没有关系。
如何修改LoggingActor或SendMovement函数以允许LoggingActor'掉入'LoggingActor的消息处理功能通过在等待响应之前发出POST请求来进行]
我认为我想要的是服务器的POST响应(或异常),以触发另一个消息给参与者,该消息告诉参与者将其从发送队列中删除或保留(即不执行任何操作。]
您正在调用rs.SendMovement
,这将返回一个Task,而您正在其上调用Result
属性。这将锁定整个线程驻留在当前线程池中-因为线程池在使用它的所有参与者/任务之间共享,并且通常每个CPU内核包含1个工作线程。这意味着,您已经有效地关闭了整个CPU内核,直到远端响应(或发生超时)为止。
通常,使用异步代码时,永远不要阻塞线程。在Akka.NET中,可以通过2种方式之一来使用基于任务的API。
ReceiveAsync
第一个解决方案是使您的actor从ReceiveActor
继承并使用ReceiveAsync
方法定义您的消息处理程序:
class MyActor : ReceiveActor
{
public MyActor()
{
ReceiveAsync<MyMessage>(async message => {
try
{
var rs = new RemoteServer();
var rsr = await rs.SendMovement(editedMovement, _regionConfig.SiteToken, _userConfig.ServerUrl);
if (rsr != HttpStatusCode.OK)
{
Log("Movement POST failed:" + rsr, LogLevel.Error, "LoggingActor");
}
}
catch (HttpRequestException httpEx)
{
Log("Buffering movement for re-send", LogLevel.Warn, "LoggingActor");
MovementTransmitBuffer.Add(editedMovement);
}
});
}
}
这将创建一个称为non-reentrant actor的东西-这意味着,尽管您不会阻塞任何基础线程(这样它仍可被其他同时工作的actor使用),但您将阻塞当前actor ,以防止它在当前异步lambda处理程序到达末尾之前接收任何其他消息。
PipeTo
另一种方法-默认在几乎所有的akka内部体系结构中使用-在任务上使用PipeTo
方法:
class MyActor : ActorBase
{
// start with an actor in ready state
protected override bool Receive(object message) => Ready(message);
bool Ready(object message)
{
switch (message)
{
case MyMessage msg:
var rs = new RemoteServer();
rs.SendMovement(editedMovement, _regionConfig.SiteToken, _userConfig.ServerUrl)
.PipeTo(Self,
success: rsr => new Status.Success(rsr),
failure: ex => new Status.Failure(ex));
Become(Waiting);
return true;
default: return false;
}
}
bool Waiting(object message)
{
switch (message)
{
case Status.Success success:
var rsr = (HttpStatusCode)success.Status;
if (rsr != HttpStatusCode.OK)
{
Log("Movement POST failed:" + rsr, LogLevel.Error, "LoggingActor");
}
Become(Ready);
return true;
case Status.Failure failure:
Log("Buffering movement for re-send", LogLevel.Warn, "LoggingActor");
MovementTransmitBuffer.Add(editedMovement);
Become(Ready);
return true;
default: return false;
}
}
}
通过这种方式,您可以将演员变成自定义状态机。这里负责等待逻辑的代码分为两个步骤-在这里表示为Ready
/ Waiting
状态。发送异步请求后,actor将更改其行为-这意味着他现在可以处理不同的传入消息集或对它们进行不同的反应。返回的布尔值通知actor系统消息是否由当前actor处理-这可能会触发未处理的调用,默认情况下将记录未处理的消息。
该方法的优点之一是该参与者是reentrant-这意味着,在等待SendMovement
返回结果(或失败)的同时,该参与者可以自由地处理和处理其他消息。