asp.net 核心:信号器:多个 tcpclients

问题描述 投票:0回答:0

我要跟随 https://learn.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-7.0&tabs=visual-studio#queued-background-tasks-1 并为多个 TCP 客户端编写信号服务器。浏览器发送带有 ip 和端口的请求,该请求在集线器中排队。我的 backgroundservice 读取使该请求出列并启动范围服务的 DoWork 方法,该方法连接到 ip 和端口并开始将数据发送到集线器。这部分工作正常。当浏览器发出断开连接请求时,我不确定如何断开 tcp 客户端。任何帮助表示赞赏。

中心:

public class LiveDataHub : Hub     
{           
    private readonly IMyBackgroundQueue _queue;
    
    public LiveDataHub(IMyBackgroundQueue queue) => _queue = queue;

    public async Task ConnectAndGetData(string caller, string command)
    {
        Console.WriteLine("New connection request: " + caller);                                    
        await Clients.Caller.SendAsync("Send", $"{caller} has requested {command}");
        QueueItem item = new QueueItem() { Caller = caller, Command = command };
        await _queue.QueueAsync(item);
    }        
    public string GetConnectionId()
    {
        return Context.ConnectionId;
    }

    public async Task Disconnect(string caller, string command)
    {
        Console.WriteLine("Disconnect: " + caller);
        await Clients.Caller.SendAsync("Send", $"{caller} has requested {command}");
        QueueItem item = new QueueItem() { Caller = caller, Command = command };            
        await _queue.QueueAsync(item);
    }
}

MyBackgroundQueue.cs:

public interface IMyBackgroundQueue
{        
    ValueTask QueueAsync(QueueItem item);        
    IAsyncEnumerable<QueueItem> DequeueAllAsync(
      CancellationToken cancellationToken);
}
public class MyBackgroundQueue : IMyBackgroundQueue
{        
    private readonly Channel<QueueItem> _channel;
    public MyBackgroundQueue(int capacity)
    {
        var options = new BoundedChannelOptions(capacity)
        {
            FullMode = BoundedChannelFullMode.Wait
        };
        _channel = Channel.CreateBounded<QueueItem>(options);
    }

    public ValueTask QueueAsync(QueueItem item) => _channel.Writer.WriteAsync(item);
    
    public IAsyncEnumerable<QueueItem> DequeueAllAsync(CancellationToken ct) =>
        _channel.Reader.ReadAllAsync(ct);
}

QueueItem.cs:

public class QueueItem
{
    public string Command { get; set; }
    public string Caller { get; set; }
}
public class ScopedServiceItem
{        
    public CancellationTokenSource Token{get;set;}
    public string ConnectionID { get; set; }
}

ScopedProcessingService.cs:

public async Task DoWork(CancellationToken stoppingToken, IHubContext<LiveDataHub> liveDataHubContext, string callerName)
    {
        while (!stoppingToken.IsCancellationRequested)
        { 
            stoppingToken.Register(() =>
            {
                //close tcp connection
              Console.WriteLine("Token Cancelled! Cleaning up and exiting...");
            }                            
            );

          
            var eventMessage = new Models.EventMessage($"{callerName}", $"Title_{Guid.NewGuid():N}", DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss"));
            
            await liveDataHubContext.Clients.Client(callerName).SendAsync("onMessageReceived", eventMessage, stoppingToken);                
            await Task.Delay(1000, stoppingToken);
        }
    }

    

我的后台服务.cs

public class ConsumeScopedServiceHostedService : BackgroundService
{
    private readonly ILogger<ConsumeScopedServiceHostedService> _logger;
    private readonly IHubContext<LiveDataHub> _liveDataHubContext;
    public IMyBackgroundQueue _queue { get; }
    private List<ScopedServiceItem> scopedServicesList;
    public ConsumeScopedServiceHostedService(IServiceProvider services,
        ILogger<ConsumeScopedServiceHostedService> logger,
        IHubContext<LiveDataHub> liveDataHubContext)
    {
        Services = services;
        _logger = logger;
        _liveDataHubContext = liveDataHubContext;
    }
    public ConsumeScopedServiceHostedService(IServiceProvider services,
        ILogger<ConsumeScopedServiceHostedService> logger,
        IHubContext<LiveDataHub> liveDataHubContext, IMyBackgroundQueue queue)
    {
        Services = services;
        _logger = logger;
        _liveDataHubContext = liveDataHubContext;
        _queue = queue;
        scopedServicesList = new List<ScopedServiceItem>()
    }
    public IServiceProvider Services { get; }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation(
            "Consume Scoped Service Hosted Service running.");
        
        while (!stoppingToken.IsCancellationRequested)
        {
            await foreach (var _item in _queue.DequeueAllAsync(stoppingToken))
            {
                QueueItem item = (QueueItem)_item;
                
                if (item.Command.ToLower() == "disconnect")
                {
                    Console.WriteLine("ConsumeScopedService: New request[ "+ item.Caller + "]:" + item.Command);
                    //here, I want to stop the ScopedService's CancelWork which will have disconnecting tcpclient and disposing of object. How do I do this?
                    
                    //CancelWork(cancellationToken, item.Caller);
                } else if(item.Command.ToLower() == "connect") {
                    Console.WriteLine("ConsumeScopedService: New request[ " + item.Caller + "]:" + item.Command);
                    CancellationTokenSource cancellationToken = new CancellationTokenSource();
                    DoWork(cancellationToken.Token, item.Caller);
                }
            }
            await Task.Delay(1000, stoppingToken);
        }
    }
    private async Task CancelWork(CancellationTokenSource cancellationToken)
    {
          cancellationToken.Cancel();
    }
    
    private async Task DoWork(CancellationToken stoppingToken, string callerName)
    {
        _logger.LogInformation(
            "Consume Scoped Service Hosted Service is working.");

        using (var scope = Services.CreateScope())
        {
                var scopedProcessingService =
                    scope.ServiceProvider
                        .GetRequiredService<IScopedProcessingService>();
               scopedServicesList.Add(new ScopedServiceItem() { Token = stoppingToken, ConnectionID = callerName });
                await scopedProcessingService.DoWork(stoppingToken, _liveDataHubContext, callerName);

        }
    }

    public override async Task StopAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation(
            "Consume Scoped Service Hosted Service is stopping.");

        await base.StopAsync(stoppingToken);
    }
}

如果有更好的方法,请告诉我。我的目标是托管一个信号服务器,该服务器根据浏览器发送的 IP 和端口在内部连接到单独的 tcp 服务器。

谢谢

asp.net-core signalr
© www.soinside.com 2019 - 2024. All rights reserved.