我有一个使用 signalr 的 asp.net 核心项目。我正在尝试通过队列将组名作为参数传递给我的后台服务。在我的集线器中,我编写了一个函数来使用从 javascript 传入的组名加入组。我在浏览器中只收到一条消息。
这是我的代码:
LiveDataHub.cs:
public sealed class LiveDataHub : Hub
{
private readonly IMyBackgroundQueue _queue;
public LiveDataHub(IMyBackgroundQueue queue) => _queue = queue;
public async Task JoinToGroup(string group)
{
Console.WriteLine("New group joined: " + group);
await _queue.QueueAsync(group);
await Groups.AddToGroupAsync(Context.ConnectionId, group);
await Clients.Group(group).SendAsync("Send", $"{Context.ConnectionId} has joined the group {group}.");
}
}
我的背景队列.cs
public interface IMyBackgroundQueue
{
ValueTask QueueAsync(string item);
IAsyncEnumerable<string> DequeueAllAsync(
CancellationToken cancellationToken);
}
public class MyBackgroundQueue : IMyBackgroundQueue
{
//private readonly Channel<string> _channel = Channel.CreateUnbounded<string>();
private readonly Channel<string> _channel;
public MyBackgroundQueue(int capacity)
{
var options = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
_channel = Channel.CreateBounded<string>(options);
}
public ValueTask QueueAsync(string item) => _channel.Writer.WriteAsync(item);
public IAsyncEnumerable<string> DequeueAllAsync(CancellationToken ct) =>
_channel.Reader.ReadAllAsync(ct);
}
MyBackgroundService.cs:
public class MyBackgroundService : BackgroundService
{
private readonly IHubContext<LiveDataHub> _hub;
public IMyBackgroundQueue _queue { get; }
public MyBackgroundService(IHubContext<LiveDataHub> hub, IMyBackgroundQueue queue)
{
_hub = hub;
_queue = queue;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var eventMessage = new Models.EventMessage($"Id_{ Guid.NewGuid():N}", $"Title_{Guid.NewGuid():N}", DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss"));
await foreach(var _item in _queue.DequeueAllAsync(stoppingToken))
{
Console.WriteLine("group name: " + _item.ToString());
await _hub.Clients.Group(_item.ToString()).SendAsync("onMessageReceived", eventMessage, stoppingToken);
}
await Task.Delay(1000, stoppingToken);
}
}
public override async Task StopAsync(CancellationToken stoppingToken)
{
await base.StopAsync(stoppingToken);
}
}
Startup.cs:
public void ConfigureServices(IServiceCollection services)
{
services.AddRazorPages();
services.AddSignalR(hubPotions=> { hubPotions.EnableDetailedErrors = true; });
services.AddHostedService<MyBackgroundService>();
services.AddSingleton<IMyBackgroundQueue, MyBackgroundQueue>(ctx => {
return new MyBackgroundQueue(100);
}); // I think the error is here on this line but I can't seem to figure out what it is
}
javascript.js
const signalrConnection = new signalR.HubConnectionBuilder()
.withUrl("/messagebroker")
.configureLogging(signalR.LogLevel.Information)
.build();
signalrConnection.start().then(function () {
console.log("SignalR Hub Connected");
signalrConnection.invoke("JoinToGroup", "Group01")
.catch(function (err) {
console.error(err.toString());
});
}).catch(function (err) {
signalrConnection.stop();
console.error(err.toString());
});