我正在为个人项目开发基于 SocketAsyncEventArgs 的基于任务的包装器。到目前为止,除了任务取消之外,一切都正常。在实施过程中,我遇到了一个问题,即对如何以正确的方式实施取消缺乏了解。
假设我有任务返回的 ReceiveAsync 和 ReceiveFromAsync 方法,它们采用具有以下签名的 CancellationToken:
public Task<int> ReceiveAsync(SocketAsyncEventArgs e, CancellationToken cancellationToken) {
cancellationToken.Register(state => {
// do something to cancel the operation when the cancellationToken is signalled
}, someState);
bool completedAsync = socket.ReceiveAsync(e);
// ...
}
public Task<int> ReceiveFromAsync(SocketAsyncEventArgs e, CancellationToken cancellationToken) {
cancellationToken.Register(state => {
// do something to cancel the operation when the cancellationToken is signalled
}, someState);
bool completedAsync = socket.ReceiveFromAsync(e);
// ...
}
此外,假设我有 SendAsync 和 SendToAsync 方法,还具有具有以下签名的 CancellationToken:
public Task<int> SendAsync(SocketAsyncEventArgs e, CancellationToken cancellationToken) {
cancellationToken.Register(state => {
// do something to cancel the operation when the cancellationToken is signalled
}, someState);
bool completedAsync = socket.SendAsync(e);
// ...
}
public Task<int> SendToAsync(SocketAsyncEventArgs e, CancellationToken cancellationToken) {
cancellationToken.Register(state => {
// do something to cancel the operation when the cancellationToken is signalled
}, someState);
bool completedAsync = socket.SendToAsync(e);
// ...
}
SocketAsyncEventArgs.Completed 事件处理程序如下所示。假设到目前为止 SocketAsyncEventArgs 处理程序的实现正确。唯一缺少的是取消:
public void HandleIoCompleted(object sender, SocketAsyncEventArgs args) {
switch (args.LastOperation) {
case SocketAsyncOperation.Receive:
switch (args.SocketError) {
case SocketError.Success:
// ...
break;
case SocketError.OperationAborted:
Debug.WriteLine("CompleteReceive experienced SocketError.OperationAborted!");
break;
default:
// ...
break;
}
break;
case SocketAsyncOperation.ReceiveFrom:
switch (args.SocketError) {
case SocketError.Success:
// ...
break;
case SocketError.OperationAborted:
Debug.WriteLine("CompleteReceiveFrom experienced SocketError.OperationAborted!");
break;
default:
// ...
break;
}
break;
case SocketAsyncOperation.Send:
switch (args.SocketError) {
case SocketError.Success:
// ...
break;
case SocketError.OperationAborted:
Debug.WriteLine("CompleteSend experienced SocketError.OperationAborted!");
break;
default:
// ...
break;
}
break;
case SocketAsyncOperation.SendTo:
switch (args.SocketError) {
case SocketError.Success:
// ...
break;
case SocketError.OperationAborted:
Debug.WriteLine("CompleteSendTo experienced SocketError.OperationAborted!");
break;
default:
// ...
break;
}
break;
default:
throw new NotSupportedException($"{nameof(HandleIoCompleted)} doesn't support {args.LastOperation}");
}
}
通过查看有关此主题的其他问题,我了解到要取消 Socket.ReceiveAsync 或 Socket.ReceiveFromAsnc 操作,我需要处理底层套接字。然而,这会干扰任何正在进行的异步发送操作,当套接字被处置时,这些操作将以 SocketError.OperationAborted 结束。同样,取消任何 Socket.SendAsync 或 Socket.SendToAsync 操作都会干扰当前正在进行的接收操作,在处置套接字时也会遇到 SocketError.OperationAborted。
以安全方式取消操作的正确方法是什么,以便 UDP 和 TCP 流量可以在数据包丢失后恢复(如果没有取消,则 receiveFromAsync 和 receiveAsync 调用永远不会调用 e.Completed)?
如果需要额外的代码,只需添加一条注释,我会尽可能充实代码。
Task.WaitAsync
我在正确取消时遇到问题
await listener.AcceptAsync()
。当我关闭侦听器时,它引发异常并且套接字未正确释放。我找到了一个使用两步任务初始化的有效解决方案,也许它会帮助处于类似情况的其他人,并且它适用于任何Socket.xxxAsync()
方法:
//just for testing: the cancellation will occur after 2000 milliseconds
using var cst = new CancellationSourceToken(2000);
try
{
//listener is your Socket instance
var handlerTask = listener.AcceptAsync();
var handler = await handlerTask.WaitAsync(cst.Token);
//then you can do the same with the handler
while (true)
{
var bytes = new byte[1024];
var receiverTask = handler.ReceiveAsync(bytes);
var bytesCount = await receiverTask.WaitAsync(cst.Token);
//then process the data batch before receiving the next one
if (your_end_of_msg_predicate())
break;
}
}
catch (OperationCanceledException) { }
由于我将所有这些以及其他方法都放在
Parallel.Invoke
中,并且我需要在计算继续之前完成所有这些,所以我最终将 await
切换为 .Result
以避免使用 调用调用的方法async
:
var handler = handlerTask.WaitAsync(cst.Token).Result;
//and a few lines later
var bytesCount = receiverTask.WaitAsync(cst.Token).Result;