我正在学习使用 Blazor WebAssembly 和 SignalR。我想要拥有单个 WebSocket 连接(因此需要一个 HubConnection 实例),并在需要通过 SignalR WebSockets 执行某些操作的所有 Blazor 组件之间共享该连接。
我想调用 HubConnection.StreamAsync() ,然后在数据进入时循环生成的流(这是一个无尽的流)。我还希望它在组件不再显示时取消流。
我对这个想法有两个问题:
我有一种模糊的预感,这两件事以某种方式连接起来 - 不知何故在服务器端,单个集线器只能真正响应单个连接中的单个 StreamAsync() 调用?
最终我还是没能找到解决办法。我究竟做错了什么?也许你们中的任何人都可以帮我解决这个问题?
代码
要复制,您可以按照说明操作:
从 Blazor WebAssembly 和 ASP.NET MVC 后端项目开始。我使用.net core 6,大多数nugets都是6.0.7版本。我还在 Blazor 客户端项目上安装了 nuget Microsoft.AspNetCore.SignalR.Client。
进行以下更改/更新:
客户端 - App.razor 添加此代码
@using Microsoft.AspNetCore.SignalR.Client
@implements IAsyncDisposable
@inject HubConnection HubConnection
...
@code {
private CancellationTokenSource cts = new();
protected override void OnInitialized()
{
base.OnInitialized();
HubConnection.Closed += error =>
{
return ConnectWithRetryAsync(cts.Token);
};
_ = ConnectWithRetryAsync(cts.Token);
}
private async Task<bool> ConnectWithRetryAsync(CancellationToken token)
{
// Keep trying to until we can start or the token is canceled.
while (true)
{
try
{
await HubConnection.StartAsync(token);
return true;
}
catch when (token.IsCancellationRequested)
{
return false;
}
catch
{
// Try again in a few seconds. This could be an incremental interval
await Task.Delay(5000);
}
}
}
public async ValueTask DisposeAsync()
{
cts.Cancel();
cts.Dispose();
await HubConnection.DisposeAsync();
}
}
客户端-Program.cs 将以下单例添加到 DI
builder.Services.AddSingleton(sp =>
{
var navMan = sp.GetRequiredService<NavigationManager>();
return new HubConnectionBuilder()
.WithUrl(navMan.ToAbsoluteUri("/string"))
.WithAutomaticReconnect()
.Build();
});
Client - 创建一个名为“StringDisplay”的组件
@using Microsoft.AspNetCore.SignalR.Client
@inject HubConnection HubConnection
@implements IDisposable
@if(currentString == string.Empty)
{
<i>Loading...</i>
}
else
{
@currentString
}
@code {
private string currentString = string.Empty;
private CancellationTokenSource cts = new();
protected override void OnInitialized()
{
base.OnInitialized();
_ = Consumer();
}
protected override void OnParametersSet()
{
base.OnParametersSet();
_ = Consumer();
}
private async Task Consumer()
{
try
{
cts.Cancel();
cts.Dispose();
cts = new();
var stream = HubConnection.StreamAsync<string>("GetStrings", cts.Token);
await foreach(var str in stream)
{
if(cts.IsCancellationRequested)
break;
currentString = str;
StateHasChanged();
}
}
catch(Exception e)
{
Console.WriteLine(e);
}
}
public void Dispose()
{
cts.Cancel();
cts.Dispose();
}
}
客户端 - Index.razor 在页面上添加 StringDisplay 组件 3 次:
<hr />
<StringDisplay /><hr />
<StringDisplay /><hr />
<StringDisplay /><hr />
服务器 - 创建 StringGeneratorService.cs
namespace BlazorWebAssembly.Server.Services;
public class StringGeneratorService
{
private readonly PeriodicTimer _timer;
public event Action<string>? OnGenerated;
public StringGeneratorService()
{
_timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200));
Task.Run(TimerRunnerAsync);
}
private async Task TimerRunnerAsync()
{
while (true)
{
await _timer.WaitForNextTickAsync();
var str = Guid.NewGuid().ToString();
OnGenerated?.Invoke(str);
}
}
}
服务器 - 创建 StringHub.cs
using BlazorWebAssembly.Server.Services;
using Microsoft.AspNetCore.SignalR;
using System.Runtime.CompilerServices;
namespace BlazorWebAssembly.Server.Hubs
{
public class StringHub : Hub
{
private readonly StringGeneratorService _generatorService;
public StringHub(StringGeneratorService generatorService)
{
_generatorService = generatorService;
}
public async IAsyncEnumerable<string> GetStrings([EnumeratorCancellation] CancellationToken cancellationToken)
{
using var flag = new AutoResetEvent(false);
string currentString = string.Empty;
var listener = (string str) => { currentString = str; flag.Set(); };
_generatorService.OnGenerated += listener;
cancellationToken.Register(() =>
{
_generatorService.OnGenerated -= listener;
});
while (!cancellationToken.IsCancellationRequested)
{
flag.WaitOne();
yield return currentString;
}
yield break;
}
}
}
服务器-Program.cs 注册必要的零件
builder.Services.AddSingleton<StringGeneratorService>();
...
app.MapHub<StringHub>("/string");
我自己发现了这个问题。
核心问题是 - SignalR Hub 是单线程的单例。
如果你看
StringHub.cs
,while
循环中有一行:flag.WaitOne();
。这会阻塞该 Hub 实现的整个线程。
我为自己做了一个
AutoResetEvent
的扩展,它允许我异步等待信号。扩展名是这样的:
public static async Task<bool> WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken)
{
try
{
return await Task.Run(waitHandle.WaitOne, cancellationToken);
}
catch (TaskCanceledException)
{
return false;
}
}
有了这个扩展,我可以用以下行等待信号:
await flag.WaitOneAsync(cancellationToken);
所以请记住 - 集线器本身是单线程的!不要阻塞他们的帖子!