有没有办法防止“请求的会话无法被接受。它可能被其他接收者锁定。”

问题描述 投票:0回答:1

我已经尝试了很多,但在我的用例中似乎没有任何效果。

我有一个react/redux前端,它设置了与我的asp.net core服务器的signalR连接。此后,前端触发一个名为“startTesting”的函数。该函数有一个循环,每 4 秒向我的前端发送一次更新。更新内的数据来自 Azure 消息会话,并且需要采用 FIFO。我使用单例来获取此数据并将其传递到 signalR 集线器,在该集线器中对其进行处理并发送到前端。单例消息处理器代码如下所示:

public async Task<string> ReceiveMessage(string imei, CancellationToken ct = default) 
{ 
    var receiver = await _sessionClient.AcceptMessageSessionAsync(imei);

    try
    {
        if (!_processingSessions.TryAdd(imei, true))
        {
            throw new Exception("Session is already being processed");
        }

        await Task.Delay(1000, ct);

        Message message = await receiver.ReceiveAsync(TimeSpan.FromMilliseconds(10));

        if (message == null) return null;

        _logger.LogInformation("Message received: {Message}", message.Body);

        await receiver.CompleteAsync(message.SystemProperties.LockToken);
        
        return Encoding.UTF8.GetString(message.Body);
    }
    catch (Exception e)
    {
        _logger.LogError("Service Bus Exception: {ExMessage}", e.Message);
        throw new Exception(e.Message);
    }
    finally
    {
        // _logger.LogInformation("Closing Message Session");
        _processingSessions.TryRemove(imei, out var result);
            imei, result);
        await receiver.CloseAsync();
    }
}

这一切在“快乐的流程”中运行良好。我启动连接,测试开始,然后触发。但是,一旦 signalR 端发生刷新,旧连接就会被释放并创建新连接,这会产生错误“无法接受请求的会话。它可能被另一个接收器锁定。”。即使我在finally中说await receive.closeAsync()。

有人可以帮我吗,我无法弄清楚这个问题。顺便说一句:我已经尝试过 ServiceBusSessionReceiver。

我尝试过的ServiceBusSessionReceiver:


//Constructor:
// _serviceBusClient = serviceBusClient.CreateClient("DeviceTestQueue");

//Message processor receiver:
// var receiver = await _serviceBusClient.AcceptSessionAsync(
//     _configuration.GetSection("DEVICETESTMSGQUEUE").Value,
//     imei, cancellationToken: ct
// );

我不确定它是否有任何帮助,但这是我的 signalR 前端代码(我可能在会话管理方面做错了):

import {
    HttpTransportType,
    HubConnectionBuilder,
    HubConnectionState,
    JsonHubProtocol,
    LogLevel
} from '@microsoft/signalr';
import * as types from "../actions/actionsTypes";

const baseUrl = process.env.SOCKET_URL + 'devicetests/ws';
const protocol = new JsonHubProtocol();

let connection = null;

export const initializeConnection = (imei) => async (dispatch) => {
    let url = `not relevant`;

    if (!connection) {
        console.log("Creating new DeviceTestSocket connection");

        connection = new HubConnectionBuilder().withUrl(
            url,
            {
                // accessTokenFactory: () => accessToken,
                transport: HttpTransportType.WebSockets | HttpTransportType.LongPolling,
                logMessageContent: true,
                logger: LogLevel.Debug,
            })
            .withHubProtocol(protocol)
            .build();
    }

    await connection.start().then(() => {
        console.log("Connected to DeviceTestSocket");

        startTesting(imei);
    });


    await connection.on("SendTests", (message) => {
        console.log("SendTests Triggered");
        dispatch({
            type: types.INITIALIZE_TESTS,
            payload: {
                deviceTests: message
            }
        })
    });

    await connection.on("UpdateTest", (message) => {
        console.log("UpdateTest Triggered");
        dispatch({
            type: types.UPDATE_TEST,
            payload: {
                deviceTests: message
            }
        })
    });
}

export const startTesting = async (imei) => {
    if (isConnectionOpen) {
        await connection.invoke("StartTesting", imei, new Date());
    }
}

export const closeConnection = () => async (dispatch) => {
    if (isConnectionOpen) {
        connection.stop().then(() => {
            console.log("Disconnected from DeviceTestSocket");
        });

        dispatch({
            type: types.FINISH_TESTS,
            payload: {
                deviceTests: []
            }
        });
    }
}

export const isConnectionOpen = () => connection && connection.state === HubConnectionState.Connected;

希望这有帮助!

已编辑

实施 Rick Montalvo 的解决方案后,效果更好,但经过严格测试后,我不断收到错误。

我也会在这里分享我的 signalR 代码,它调用我已经共享的“ReceiveMessage”函数。我的 SignalR 代码如下所示:

public class DeviceTestHub : Hub<IDeviceTestClient>
{
    private readonly ILogger<DeviceTestHub> _logger;
    private readonly IMemoryCache _cache;
    private readonly IMessageProcessor _messageProcessor;
    private readonly IDeviceTestHandler _deviceTestHandler;
    private readonly TimeSpan _cacheExpirationTime = TimeSpan.FromMinutes(15);
    private readonly CancellationTokenSource _cts;


    public DeviceTestHub(IDeviceTestHandler deviceTestHandler, IMemoryCache memoryCache,
        ILogger<DeviceTestHub> logger, IMessageProcessor messageProcessor)
    {
        _cache = memoryCache;
        _logger = logger;
        _messageProcessor = messageProcessor;
        _deviceTestHandler = deviceTestHandler;
        
        //Set cancellation token for 12 minutes to cancel the test if it takes longer than 12 minutes
        _cts = new CancellationTokenSource(TimeSpan.FromMinutes(12));
    }

    public override async Task OnConnectedAsync()
    {
        //Get IMEI from header
        var context = Context.GetHttpContext();

        //Try get imei from header
        if (!context!.Request.Query.TryGetValue("imei", out var imei)) return;

        _logger.LogInformation("Connection established for device: {imei}", imei);

        //Connect to hub
        await base.OnConnectedAsync();

        var savedTests = _cache.Get<List<SocketModel>>($"{MemoryCacheKeys.DeviceTest}-{imei}");

        if (savedTests != null)
        {
            //Send saved tests to device
            await Clients.Caller.SendTests(savedTests);

            return;
        }

        //Get socket tests
        var socketTests = await _deviceTestHandler.GetTests(imei);

        //Save socket tests that specific imei has to run in case of disconnection, refresh or other connection
        _cache.Set($"{MemoryCacheKeys.DeviceTest}-{imei}", socketTests, _cacheExpirationTime);


        //Send list with tests to frontend
        await Clients.Caller.SendTests(socketTests);

        //Remove possible old session imei's from cache
        await _messageProcessor.Dispose(imei);
    }

    public async Task StartTesting(string imei, DateTime startDate)
    {
        try
        {
            //Clear session before start testing
            await _messageProcessor.Dispose(imei);

            //Do the actual tests
            //Get raw message from Message Session, send it to the frontend but also pass it to the test handler
            var socketTests =
                _cache.Get<List<SocketModel>>($"{MemoryCacheKeys.DeviceTest}-{imei}")
                ?? await _deviceTestHandler.GetTests(imei);

            var messages = new List<RawMessageDTO>();

            while (!_cts.IsCancellationRequested)
            {
                _cts.Token.ThrowIfCancellationRequested();

                if (socketTests.Any(x => x.State == State.NotStarted))
                {
                    //Loop through tests and return the first test that is not started yet and set it to loading
                    var test = _deviceTestHandler.SetTestActive(socketTests, _cts.Token);

                    //Send the changed test to frontend
                    if (test != null) await Clients.Caller.UpdateTest(test);
                }
                
                //Get raw message from Message Session
                var message = await _messageProcessor.ReceiveMessage(imei, _cts.Token);

                if (message.IsNullOrEmpty() && messages.IsNullOrEmpty())
                {
                    await Task.Delay(3000, _cts.Token);
                    continue;
                }

                if (!message.IsNullOrEmpty())
                {
                    var rawMsg = await _deviceTestHandler.ConvertMessageToRawMessage(message);

                    //Send raw message to frontend
                    await Clients.Caller.SendRawMessage(JsonSerializer.Serialize(rawMsg));

                    messages.Add(rawMsg);
                }

                var activeTest = socketTests.FirstOrDefault(x => x.State is State.Loading or State.Inprogress);

                if (activeTest is null) return;

                var executedTest = await _deviceTestHandler.ExecuteTest(imei, activeTest, messages, startDate);

                if (executedTest != null)
                {
                    //Send the changed test to frontend
                    await Clients.Caller.UpdateTest(executedTest);
                }
            }
        }
        catch (OperationCanceledException e)
        {
            await Clients.Caller.SendMessage($"Test took longer than 12 minutes and is cancelled for imei: {imei}");
            await _messageProcessor.Dispose(imei);
            await OnDisconnectedAsync(e);
        }
        catch (Exception ex)
        {
            _logger.LogError($"");
            await _messageProcessor.Dispose(imei);
            await Clients.Caller.SendMessage(ex.Message);
        }
    }

    public override async Task OnDisconnectedAsync(Exception? exception)
    {
        _cts.Cancel();
        
        //Get IMEI from header
        var context = Context.GetHttpContext();

        //Try get imei from header
        if (!context!.Request.Query.TryGetValue("imei", out var imei)) return;

        await _messageProcessor.Dispose(imei);

        if (exception != null)
            _logger.LogError("Error!");

        await base.OnDisconnectedAsync(exception);
    }
}
c# azure react-native signalr
1个回答
1
投票

根据提供的信息,您对操作创建器和异步函数使用相同的名称startTesting。这可能会导致混乱和意外行为。让我们进行一些调整,以确保正确的 SignalR 连接处理和消息处理,同时维护消息的 FIFO 顺序:

  1. 重构动作创建者和异步函数以具有不同的 名称。我们将调用动作创建者initializeConnection 和 异步函数 sendStartTestingRequest。

  2. 实现处理SignalR连接断开的机制 在处理之前通过调用 closeConnection 优雅地 联系。这将确保任何活动的消息会话 在连接停止之前正确关闭。

  3. 使用队列存储来自 SignalR 中心的传入消息。这 将有助于维持消息的 FIFO 顺序,即使 连接已刷新或处理有延迟。

下面的代码应该处理连接和断开连接,并且它应该维护消息的先进先出顺序。

//  signalR.js
//  This is based on the code that you have provided and based on 
//  what I can see in your comments and code.  Outside of that, 
//  the answer to your question/problem is between your code 
//  and what I have posted here.
import {
    HttpTransportType,
    HubConnectionBuilder,
    HubConnectionState,
    JsonHubProtocol,
    LogLevel
} from '@microsoft/signalr';
import * as types from "../actions/actionsTypes";

const baseUrl = process.env.SOCKET_URL + 'devicetests/ws';
const protocol = new JsonHubProtocol();

let connection = null;

export const initializeConnection = (imei) => async (dispatch) => {
    let url = `not relevant`;

    try {
        if (!connection) {
            console.log("Creating new DeviceTestSocket connection");

            connection = new HubConnectionBuilder().withUrl(
                url,
                {
                    // accessTokenFactory: () => accessToken,
                    transport: HttpTransportType.WebSockets | HttpTransportType.LongPolling,
                    logMessageContent: true,
                    logger: LogLevel.Debug,
                })
                .withHubProtocol(protocol)
                .build();
        }

        await connection.start();
        console.log("Connected to DeviceTestSocket");

        if (isConnectionOpen()) {
            startTesting(imei);
        }

        await connection.on("SendTests", (message) => {
            console.log("SendTests Triggered");
            dispatch({
                type: types.INITIALIZE_TESTS,
                payload: {
                    deviceTests: message
                }
            })
        });

        await connection.on("UpdateTest", (message) => {
            console.log("UpdateTest Triggered");
            dispatch({
                type: types.UPDATE_TEST,
                payload: {
                    deviceTests: message
                }
            })
        });
    } catch (error) {
        console.error("Error starting SignalR connection:", error);
    }
}

export const startTesting = async (imei) => {
    try {
        if (isConnectionOpen()) {
            await connection.invoke("StartTesting", imei, new Date());
        }
    } catch (error) {
        console.error("Error invoking 'StartTesting':", error);
    }
}

export const closeConnection = () => async (dispatch) => {
    if (isConnectionOpen()) {
        try {
            await connection.stop();
            console.log("Disconnected from DeviceTestSocket");
        } catch (error) {
            console.error("Error closing SignalR connection:", error);
        }

        dispatch({
            type: types.FINISH_TESTS,
            payload: {
                deviceTests: []
            }
        });
    }
}

export const isConnectionOpen = () => connection && connection.state === HubConnectionState.Connected;

© www.soinside.com 2019 - 2024. All rights reserved.