我已经尝试了很多,但在我的用例中似乎没有任何效果。
我有一个react/redux前端,它设置了与我的asp.net core服务器的signalR连接。此后,前端触发一个名为“startTesting”的函数。该函数有一个循环,每 4 秒向我的前端发送一次更新。更新内的数据来自 Azure 消息会话,并且需要采用 FIFO。我使用单例来获取此数据并将其传递到 signalR 集线器,在该集线器中对其进行处理并发送到前端。单例消息处理器代码如下所示:
public async Task<string> ReceiveMessage(string imei, CancellationToken ct = default)
{
var receiver = await _sessionClient.AcceptMessageSessionAsync(imei);
try
{
if (!_processingSessions.TryAdd(imei, true))
{
throw new Exception("Session is already being processed");
}
await Task.Delay(1000, ct);
Message message = await receiver.ReceiveAsync(TimeSpan.FromMilliseconds(10));
if (message == null) return null;
_logger.LogInformation("Message received: {Message}", message.Body);
await receiver.CompleteAsync(message.SystemProperties.LockToken);
return Encoding.UTF8.GetString(message.Body);
}
catch (Exception e)
{
_logger.LogError("Service Bus Exception: {ExMessage}", e.Message);
throw new Exception(e.Message);
}
finally
{
// _logger.LogInformation("Closing Message Session");
_processingSessions.TryRemove(imei, out var result);
imei, result);
await receiver.CloseAsync();
}
}
这一切在“快乐的流程”中运行良好。我启动连接,测试开始,然后触发。但是,一旦 signalR 端发生刷新,旧连接就会被释放并创建新连接,这会产生错误“无法接受请求的会话。它可能被另一个接收器锁定。”。即使我在finally中说await receive.closeAsync()。
有人可以帮我吗,我无法弄清楚这个问题。顺便说一句:我已经尝试过 ServiceBusSessionReceiver。
我尝试过的ServiceBusSessionReceiver:
//Constructor:
// _serviceBusClient = serviceBusClient.CreateClient("DeviceTestQueue");
//Message processor receiver:
// var receiver = await _serviceBusClient.AcceptSessionAsync(
// _configuration.GetSection("DEVICETESTMSGQUEUE").Value,
// imei, cancellationToken: ct
// );
我不确定它是否有任何帮助,但这是我的 signalR 前端代码(我可能在会话管理方面做错了):
import {
HttpTransportType,
HubConnectionBuilder,
HubConnectionState,
JsonHubProtocol,
LogLevel
} from '@microsoft/signalr';
import * as types from "../actions/actionsTypes";
const baseUrl = process.env.SOCKET_URL + 'devicetests/ws';
const protocol = new JsonHubProtocol();
let connection = null;
export const initializeConnection = (imei) => async (dispatch) => {
let url = `not relevant`;
if (!connection) {
console.log("Creating new DeviceTestSocket connection");
connection = new HubConnectionBuilder().withUrl(
url,
{
// accessTokenFactory: () => accessToken,
transport: HttpTransportType.WebSockets | HttpTransportType.LongPolling,
logMessageContent: true,
logger: LogLevel.Debug,
})
.withHubProtocol(protocol)
.build();
}
await connection.start().then(() => {
console.log("Connected to DeviceTestSocket");
startTesting(imei);
});
await connection.on("SendTests", (message) => {
console.log("SendTests Triggered");
dispatch({
type: types.INITIALIZE_TESTS,
payload: {
deviceTests: message
}
})
});
await connection.on("UpdateTest", (message) => {
console.log("UpdateTest Triggered");
dispatch({
type: types.UPDATE_TEST,
payload: {
deviceTests: message
}
})
});
}
export const startTesting = async (imei) => {
if (isConnectionOpen) {
await connection.invoke("StartTesting", imei, new Date());
}
}
export const closeConnection = () => async (dispatch) => {
if (isConnectionOpen) {
connection.stop().then(() => {
console.log("Disconnected from DeviceTestSocket");
});
dispatch({
type: types.FINISH_TESTS,
payload: {
deviceTests: []
}
});
}
}
export const isConnectionOpen = () => connection && connection.state === HubConnectionState.Connected;
希望这有帮助!
已编辑
实施 Rick Montalvo 的解决方案后,效果更好,但经过严格测试后,我不断收到错误。
我也会在这里分享我的 signalR 代码,它调用我已经共享的“ReceiveMessage”函数。我的 SignalR 代码如下所示:
public class DeviceTestHub : Hub<IDeviceTestClient>
{
private readonly ILogger<DeviceTestHub> _logger;
private readonly IMemoryCache _cache;
private readonly IMessageProcessor _messageProcessor;
private readonly IDeviceTestHandler _deviceTestHandler;
private readonly TimeSpan _cacheExpirationTime = TimeSpan.FromMinutes(15);
private readonly CancellationTokenSource _cts;
public DeviceTestHub(IDeviceTestHandler deviceTestHandler, IMemoryCache memoryCache,
ILogger<DeviceTestHub> logger, IMessageProcessor messageProcessor)
{
_cache = memoryCache;
_logger = logger;
_messageProcessor = messageProcessor;
_deviceTestHandler = deviceTestHandler;
//Set cancellation token for 12 minutes to cancel the test if it takes longer than 12 minutes
_cts = new CancellationTokenSource(TimeSpan.FromMinutes(12));
}
public override async Task OnConnectedAsync()
{
//Get IMEI from header
var context = Context.GetHttpContext();
//Try get imei from header
if (!context!.Request.Query.TryGetValue("imei", out var imei)) return;
_logger.LogInformation("Connection established for device: {imei}", imei);
//Connect to hub
await base.OnConnectedAsync();
var savedTests = _cache.Get<List<SocketModel>>($"{MemoryCacheKeys.DeviceTest}-{imei}");
if (savedTests != null)
{
//Send saved tests to device
await Clients.Caller.SendTests(savedTests);
return;
}
//Get socket tests
var socketTests = await _deviceTestHandler.GetTests(imei);
//Save socket tests that specific imei has to run in case of disconnection, refresh or other connection
_cache.Set($"{MemoryCacheKeys.DeviceTest}-{imei}", socketTests, _cacheExpirationTime);
//Send list with tests to frontend
await Clients.Caller.SendTests(socketTests);
//Remove possible old session imei's from cache
await _messageProcessor.Dispose(imei);
}
public async Task StartTesting(string imei, DateTime startDate)
{
try
{
//Clear session before start testing
await _messageProcessor.Dispose(imei);
//Do the actual tests
//Get raw message from Message Session, send it to the frontend but also pass it to the test handler
var socketTests =
_cache.Get<List<SocketModel>>($"{MemoryCacheKeys.DeviceTest}-{imei}")
?? await _deviceTestHandler.GetTests(imei);
var messages = new List<RawMessageDTO>();
while (!_cts.IsCancellationRequested)
{
_cts.Token.ThrowIfCancellationRequested();
if (socketTests.Any(x => x.State == State.NotStarted))
{
//Loop through tests and return the first test that is not started yet and set it to loading
var test = _deviceTestHandler.SetTestActive(socketTests, _cts.Token);
//Send the changed test to frontend
if (test != null) await Clients.Caller.UpdateTest(test);
}
//Get raw message from Message Session
var message = await _messageProcessor.ReceiveMessage(imei, _cts.Token);
if (message.IsNullOrEmpty() && messages.IsNullOrEmpty())
{
await Task.Delay(3000, _cts.Token);
continue;
}
if (!message.IsNullOrEmpty())
{
var rawMsg = await _deviceTestHandler.ConvertMessageToRawMessage(message);
//Send raw message to frontend
await Clients.Caller.SendRawMessage(JsonSerializer.Serialize(rawMsg));
messages.Add(rawMsg);
}
var activeTest = socketTests.FirstOrDefault(x => x.State is State.Loading or State.Inprogress);
if (activeTest is null) return;
var executedTest = await _deviceTestHandler.ExecuteTest(imei, activeTest, messages, startDate);
if (executedTest != null)
{
//Send the changed test to frontend
await Clients.Caller.UpdateTest(executedTest);
}
}
}
catch (OperationCanceledException e)
{
await Clients.Caller.SendMessage($"Test took longer than 12 minutes and is cancelled for imei: {imei}");
await _messageProcessor.Dispose(imei);
await OnDisconnectedAsync(e);
}
catch (Exception ex)
{
_logger.LogError($"");
await _messageProcessor.Dispose(imei);
await Clients.Caller.SendMessage(ex.Message);
}
}
public override async Task OnDisconnectedAsync(Exception? exception)
{
_cts.Cancel();
//Get IMEI from header
var context = Context.GetHttpContext();
//Try get imei from header
if (!context!.Request.Query.TryGetValue("imei", out var imei)) return;
await _messageProcessor.Dispose(imei);
if (exception != null)
_logger.LogError("Error!");
await base.OnDisconnectedAsync(exception);
}
}
根据提供的信息,您对操作创建器和异步函数使用相同的名称startTesting。这可能会导致混乱和意外行为。让我们进行一些调整,以确保正确的 SignalR 连接处理和消息处理,同时维护消息的 FIFO 顺序:
重构动作创建者和异步函数以具有不同的 名称。我们将调用动作创建者initializeConnection 和 异步函数 sendStartTestingRequest。
实现处理SignalR连接断开的机制 在处理之前通过调用 closeConnection 优雅地 联系。这将确保任何活动的消息会话 在连接停止之前正确关闭。
使用队列存储来自 SignalR 中心的传入消息。这 将有助于维持消息的 FIFO 顺序,即使 连接已刷新或处理有延迟。
下面的代码应该处理连接和断开连接,并且它应该维护消息的先进先出顺序。
// signalR.js
// This is based on the code that you have provided and based on
// what I can see in your comments and code. Outside of that,
// the answer to your question/problem is between your code
// and what I have posted here.
import {
HttpTransportType,
HubConnectionBuilder,
HubConnectionState,
JsonHubProtocol,
LogLevel
} from '@microsoft/signalr';
import * as types from "../actions/actionsTypes";
const baseUrl = process.env.SOCKET_URL + 'devicetests/ws';
const protocol = new JsonHubProtocol();
let connection = null;
export const initializeConnection = (imei) => async (dispatch) => {
let url = `not relevant`;
try {
if (!connection) {
console.log("Creating new DeviceTestSocket connection");
connection = new HubConnectionBuilder().withUrl(
url,
{
// accessTokenFactory: () => accessToken,
transport: HttpTransportType.WebSockets | HttpTransportType.LongPolling,
logMessageContent: true,
logger: LogLevel.Debug,
})
.withHubProtocol(protocol)
.build();
}
await connection.start();
console.log("Connected to DeviceTestSocket");
if (isConnectionOpen()) {
startTesting(imei);
}
await connection.on("SendTests", (message) => {
console.log("SendTests Triggered");
dispatch({
type: types.INITIALIZE_TESTS,
payload: {
deviceTests: message
}
})
});
await connection.on("UpdateTest", (message) => {
console.log("UpdateTest Triggered");
dispatch({
type: types.UPDATE_TEST,
payload: {
deviceTests: message
}
})
});
} catch (error) {
console.error("Error starting SignalR connection:", error);
}
}
export const startTesting = async (imei) => {
try {
if (isConnectionOpen()) {
await connection.invoke("StartTesting", imei, new Date());
}
} catch (error) {
console.error("Error invoking 'StartTesting':", error);
}
}
export const closeConnection = () => async (dispatch) => {
if (isConnectionOpen()) {
try {
await connection.stop();
console.log("Disconnected from DeviceTestSocket");
} catch (error) {
console.error("Error closing SignalR connection:", error);
}
dispatch({
type: types.FINISH_TESTS,
payload: {
deviceTests: []
}
});
}
}
export const isConnectionOpen = () => connection && connection.state === HubConnectionState.Connected;