我要跟随 https://learn.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-7.0&tabs=visual-studio#queued-background-tasks-1 并为多个 TCP 客户端编写信号服务器。浏览器发送带有 ip 和端口的请求,该请求在集线器中排队。我的 backgroundservice 读取使该请求出列并启动范围服务的 DoWork 方法,该方法连接到 ip 和端口并开始将数据发送到集线器。这部分工作正常。当浏览器发出断开连接请求时,我不确定如何断开 tcp 客户端。任何帮助表示赞赏。
中心:
public class LiveDataHub : Hub
{
private readonly IMyBackgroundQueue _queue;
public LiveDataHub(IMyBackgroundQueue queue) => _queue = queue;
public async Task ConnectAndGetData(string caller, string command)
{
Console.WriteLine("New connection request: " + caller);
await Clients.Caller.SendAsync("Send", $"{caller} has requested {command}");
QueueItem item = new QueueItem() { Caller = caller, Command = command };
await _queue.QueueAsync(item);
}
public string GetConnectionId()
{
return Context.ConnectionId;
}
public async Task Disconnect(string caller, string command)
{
Console.WriteLine("Disconnect: " + caller);
await Clients.Caller.SendAsync("Send", $"{caller} has requested {command}");
QueueItem item = new QueueItem() { Caller = caller, Command = command };
await _queue.QueueAsync(item);
}
}
MyBackgroundQueue.cs:
public interface IMyBackgroundQueue
{
ValueTask QueueAsync(QueueItem item);
IAsyncEnumerable<QueueItem> DequeueAllAsync(
CancellationToken cancellationToken);
}
public class MyBackgroundQueue : IMyBackgroundQueue
{
private readonly Channel<QueueItem> _channel;
public MyBackgroundQueue(int capacity)
{
var options = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
_channel = Channel.CreateBounded<QueueItem>(options);
}
public ValueTask QueueAsync(QueueItem item) => _channel.Writer.WriteAsync(item);
public IAsyncEnumerable<QueueItem> DequeueAllAsync(CancellationToken ct) =>
_channel.Reader.ReadAllAsync(ct);
}
QueueItem.cs:
public class QueueItem
{
public string Command { get; set; }
public string Caller { get; set; }
}
public class ScopedServiceItem
{
public CancellationTokenSource Token{get;set;}
public string ConnectionID { get; set; }
}
ScopedProcessingService.cs:
public async Task DoWork(CancellationToken stoppingToken, IHubContext<LiveDataHub> liveDataHubContext, string callerName)
{
while (!stoppingToken.IsCancellationRequested)
{
stoppingToken.Register(() =>
{
//close tcp connection
Console.WriteLine("Token Cancelled! Cleaning up and exiting...");
}
);
var eventMessage = new Models.EventMessage($"{callerName}", $"Title_{Guid.NewGuid():N}", DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss"));
await liveDataHubContext.Clients.Client(callerName).SendAsync("onMessageReceived", eventMessage, stoppingToken);
await Task.Delay(1000, stoppingToken);
}
}
我的后台服务.cs
public class ConsumeScopedServiceHostedService : BackgroundService
{
private readonly ILogger<ConsumeScopedServiceHostedService> _logger;
private readonly IHubContext<LiveDataHub> _liveDataHubContext;
public IMyBackgroundQueue _queue { get; }
private List<ScopedServiceItem> scopedServicesList;
public ConsumeScopedServiceHostedService(IServiceProvider services,
ILogger<ConsumeScopedServiceHostedService> logger,
IHubContext<LiveDataHub> liveDataHubContext)
{
Services = services;
_logger = logger;
_liveDataHubContext = liveDataHubContext;
}
public ConsumeScopedServiceHostedService(IServiceProvider services,
ILogger<ConsumeScopedServiceHostedService> logger,
IHubContext<LiveDataHub> liveDataHubContext, IMyBackgroundQueue queue)
{
Services = services;
_logger = logger;
_liveDataHubContext = liveDataHubContext;
_queue = queue;
scopedServicesList = new List<ScopedServiceItem>()
}
public IServiceProvider Services { get; }
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation(
"Consume Scoped Service Hosted Service running.");
while (!stoppingToken.IsCancellationRequested)
{
await foreach (var _item in _queue.DequeueAllAsync(stoppingToken))
{
QueueItem item = (QueueItem)_item;
if (item.Command.ToLower() == "disconnect")
{
Console.WriteLine("ConsumeScopedService: New request[ "+ item.Caller + "]:" + item.Command);
//here, I want to stop the ScopedService's CancelWork which will have disconnecting tcpclient and disposing of object. How do I do this?
//CancelWork(cancellationToken, item.Caller);
} else if(item.Command.ToLower() == "connect") {
Console.WriteLine("ConsumeScopedService: New request[ " + item.Caller + "]:" + item.Command);
CancellationTokenSource cancellationToken = new CancellationTokenSource();
DoWork(cancellationToken.Token, item.Caller);
}
}
await Task.Delay(1000, stoppingToken);
}
}
private async Task CancelWork(CancellationTokenSource cancellationToken)
{
cancellationToken.Cancel();
}
private async Task DoWork(CancellationToken stoppingToken, string callerName)
{
_logger.LogInformation(
"Consume Scoped Service Hosted Service is working.");
using (var scope = Services.CreateScope())
{
var scopedProcessingService =
scope.ServiceProvider
.GetRequiredService<IScopedProcessingService>();
scopedServicesList.Add(new ScopedServiceItem() { Token = stoppingToken, ConnectionID = callerName });
await scopedProcessingService.DoWork(stoppingToken, _liveDataHubContext, callerName);
}
}
public override async Task StopAsync(CancellationToken stoppingToken)
{
_logger.LogInformation(
"Consume Scoped Service Hosted Service is stopping.");
await base.StopAsync(stoppingToken);
}
}
如果有更好的方法,请告诉我。我的目标是托管一个信号服务器,该服务器根据浏览器发送的 IP 和端口在内部连接到单独的 tcp 服务器。
谢谢