Blazor WebAssembly SignalR 流 - 不能有多个组件

问题描述 投票:0回答:1

我正在学习使用 Blazor WebAssembly 和 SignalR。我想要拥有单个 WebSocket 连接(因此需要一个 HubConnection 实例),并在需要通过 SignalR WebSockets 执行某些操作的所有 Blazor 组件之间共享该连接。

我想调用 HubConnection.StreamAsync() ,然后在数据进入时循环生成的流(这是一个无尽的流)。我还希望它在组件不再显示时取消流。

我对这个想法有两个问题:

  1. 只有一个组件能够传输值。其他组件卡在“await foreach”上并且永远不会收到任何物品。
  2. 在组件处置期间,我触发了取消令牌,但服务器端没有收到取消。只有当我刷新整个浏览器时,才会触发取消。
    • 当我遍历到任何其他 Blazor 页面,然后返回到我的流组件所在的页面时,现在没有组件接收项目。

我有一种模糊的预感,这两件事以某种方式连接起来 - 不知何故在服务器端,单个集线器只能真正响应单个连接中的单个 StreamAsync() 调用?

最终我还是没能找到解决办法。我究竟做错了什么?也许你们中的任何人都可以帮我解决这个问题?

代码

要复制,您可以按照说明操作:

从 Blazor WebAssembly 和 ASP.NET MVC 后端项目开始。我使用.net core 6,大多数nugets都是6.0.7版本。我还在 Blazor 客户端项目上安装了 nuget Microsoft.AspNetCore.SignalR.Client。

进行以下更改/更新:

客户端 - App.razor 添加此代码

@using Microsoft.AspNetCore.SignalR.Client
@implements IAsyncDisposable
@inject HubConnection HubConnection

...

@code {
    private CancellationTokenSource cts = new();

    protected override void OnInitialized()
    {
        base.OnInitialized();

        HubConnection.Closed += error =>
        {
            return ConnectWithRetryAsync(cts.Token);
        };

        _ = ConnectWithRetryAsync(cts.Token);
    }

    private async Task<bool> ConnectWithRetryAsync(CancellationToken token)
    {
        // Keep trying to until we can start or the token is canceled.
        while (true)
        {
            try
            {
                await HubConnection.StartAsync(token);
                return true;
            }
            catch when (token.IsCancellationRequested)
            {
                return false;
            }
            catch
            {
                // Try again in a few seconds. This could be an incremental interval           
                await Task.Delay(5000);
            }
        }
    }

    public async ValueTask DisposeAsync()
    {
        cts.Cancel();
        cts.Dispose();
        await HubConnection.DisposeAsync();
    }
}

客户端-Program.cs 将以下单例添加到 DI

builder.Services.AddSingleton(sp =>
{
    var navMan = sp.GetRequiredService<NavigationManager>();
    return new HubConnectionBuilder()
        .WithUrl(navMan.ToAbsoluteUri("/string"))
        .WithAutomaticReconnect()
        .Build();
});

Client - 创建一个名为“StringDisplay”的组件

@using Microsoft.AspNetCore.SignalR.Client
@inject HubConnection HubConnection
@implements IDisposable

@if(currentString == string.Empty)
{
    <i>Loading...</i>
}
else
{
    @currentString
}

@code {
    private string currentString = string.Empty;
    private CancellationTokenSource cts = new();

    protected override void OnInitialized()
    {
        base.OnInitialized();

        _ = Consumer();
    }

    protected override void OnParametersSet()
    {
        base.OnParametersSet();

        _ = Consumer();
    }

    private async Task Consumer()
    {
        try
        {
            cts.Cancel();
            cts.Dispose();
            cts = new();

            var stream = HubConnection.StreamAsync<string>("GetStrings", cts.Token);

            await foreach(var str in stream)
            {
                if(cts.IsCancellationRequested)
                    break;

                currentString = str;
                StateHasChanged();
            }
        }
        catch(Exception e)
        {
            Console.WriteLine(e);
        }
    }

    public void Dispose()
    {
        cts.Cancel();
        cts.Dispose();
    }
}

客户端 - Index.razor 在页面上添加 StringDisplay 组件 3 次:

<hr />

<StringDisplay /><hr />
<StringDisplay /><hr />
<StringDisplay /><hr />

服务器 - 创建 StringGeneratorService.cs

namespace BlazorWebAssembly.Server.Services;

public class StringGeneratorService
{
    private readonly PeriodicTimer _timer;

    public event Action<string>? OnGenerated;

    public StringGeneratorService()
    {
        _timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200));

        Task.Run(TimerRunnerAsync);
    }

    private async Task TimerRunnerAsync()
    {
        while (true)
        {
            await _timer.WaitForNextTickAsync();

            var str = Guid.NewGuid().ToString();

            OnGenerated?.Invoke(str);
        }
    }
}

服务器 - 创建 StringHub.cs

using BlazorWebAssembly.Server.Services;
using Microsoft.AspNetCore.SignalR;
using System.Runtime.CompilerServices;

namespace BlazorWebAssembly.Server.Hubs
{
    public class StringHub : Hub
    {
        private readonly StringGeneratorService _generatorService;

        public StringHub(StringGeneratorService generatorService)
        {
            _generatorService = generatorService;
        }

        public async IAsyncEnumerable<string> GetStrings([EnumeratorCancellation] CancellationToken cancellationToken)
        {
            using var flag = new AutoResetEvent(false);

            string currentString = string.Empty;

            var listener = (string str) => { currentString = str; flag.Set(); };

            _generatorService.OnGenerated += listener;

            cancellationToken.Register(() =>
            {
                _generatorService.OnGenerated -= listener;
            });

            while (!cancellationToken.IsCancellationRequested)
            {
                flag.WaitOne();

                yield return currentString;
            }

            yield break;
        }

    }
}

服务器-Program.cs 注册必要的零件

builder.Services.AddSingleton<StringGeneratorService>();
...
app.MapHub<StringHub>("/string");
c# blazor signalr blazor-webassembly signalr-hub
1个回答
0
投票

我自己发现了这个问题。

核心问题是 - SignalR Hub 是单线程的单例。

如果你看

StringHub.cs
while
循环中有一行:
flag.WaitOne();
。这会阻塞该 Hub 实现的整个线程。

我为自己做了一个

AutoResetEvent
的扩展,它允许我异步等待信号。扩展名是这样的:

public static async Task<bool> WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken)
{
    try
    {
        return await Task.Run(waitHandle.WaitOne, cancellationToken);
    }
    catch (TaskCanceledException)
    {
        return false;
    }
}

有了这个扩展,我可以用以下行等待信号:

await flag.WaitOneAsync(cancellationToken);

所以请记住 - 集线器本身是单线程的!不要阻塞他们的帖子!

© www.soinside.com 2019 - 2024. All rights reserved.