如何释放Azure服务总线会话锁?

问题描述 投票:0回答:1

我正在使用具有启用会话订阅的服务总线主题,并且我想在处理一条消息后立即释放会话锁定。例如,如果第一个 ServiceBusSessionProcessor 获取消息并调用 CompleteMessageAsync,则另一个 ServiceBusSessionProcessor 应该能够获取同一 sessionId 的消息。

目前我必须等待 5 分钟,锁才会过期,然后第二个处理器才能开始获取消息。

azure azureservicebus azure-servicebus-topics azure-servicebus-subscriptions
1个回答
0
投票

我尝试使用下面的 C# 控制台代码创建一个服务总线会话处理器,用于处理具有“不同会话 ID”的消息,其中包含发送至指定主题和订阅的 5 条消息。会话处理器在完成消息处理之前会延迟 10 秒来处理每条消息。

代码:

using System; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; class Program { const string ServiceBusConnectionString = "<ServiceBusTopic_connecString>"; const string TopicName = "<topic_name>"; const string SubscriptionName = "<subscription_name>"; static async Task Main() { await SendMessageAsync(); var serviceBusClient = new ServiceBusClient(ServiceBusConnectionString); var sessionProcessor = serviceBusClient.CreateSessionProcessor(TopicName, SubscriptionName, new ServiceBusSessionProcessorOptions { MaxConcurrentSessions = 1 }); sessionProcessor.ProcessMessageAsync += ProcessMessagesAsync; sessionProcessor.ProcessErrorAsync += ExceptionReceivedHandler; try { Console.WriteLine("Starting session processor..."); await sessionProcessor.StartProcessingAsync(); Console.WriteLine("Press any key to exit..."); Console.ReadKey(); } finally { Console.WriteLine("Closing session processor..."); await sessionProcessor.CloseAsync(); await serviceBusClient.DisposeAsync(); Console.WriteLine("Exiting application..."); } } static async Task SendMessageAsync() { var serviceBusClient = new ServiceBusClient(ServiceBusConnectionString); var sender = serviceBusClient.CreateSender(TopicName); try { for (int i = 1; i <= 5; i++) { var sessionId = $"session-{i}"; var messageBody = $"Message {i}"; var message = new ServiceBusMessage(messageBody) { SessionId = sessionId }; await sender.SendMessageAsync(message); Console.WriteLine($"Message sent with SessionId: {sessionId}"); } } finally { await sender.CloseAsync(); await serviceBusClient.DisposeAsync(); } } static async Task ProcessMessagesAsync(ProcessSessionMessageEventArgs args) { try { var session = args.SessionId; var message = args.Message; Console.WriteLine($"Received message: SessionId = {session}, SequenceNumber = {message.SequenceNumber}"); await Task.Delay(TimeSpan.FromSeconds(10)); await args.CompleteMessageAsync(message); Console.WriteLine($"Completed message: SessionId = {session}, SequenceNumber = {message.SequenceNumber}"); } catch (Exception ex) { Console.WriteLine($"Error processing message: {ex.Message}"); } } static Task ExceptionReceivedHandler(ProcessErrorEventArgs args) { Console.WriteLine($"Message handler encountered an exception: {args.Exception}"); return Task.CompletedTask; } }

输出:

运行成功如下。

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.