我正在使用具有启用会话订阅的服务总线主题,并且我想在处理一条消息后立即释放会话锁定。例如,如果第一个 ServiceBusSessionProcessor 获取消息并调用 CompleteMessageAsync,则另一个 ServiceBusSessionProcessor 应该能够获取同一 sessionId 的消息。
目前我必须等待 5 分钟,锁才会过期,然后第二个处理器才能开始获取消息。
我尝试使用下面的 C# 控制台代码创建一个服务总线会话处理器,用于处理具有“不同会话 ID”的消息,其中包含发送至指定主题和订阅的 5 条消息。会话处理器在完成消息处理之前会延迟 10 秒来处理每条消息。
代码:
using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
class Program
{
const string ServiceBusConnectionString = "<ServiceBusTopic_connecString>";
const string TopicName = "<topic_name>";
const string SubscriptionName = "<subscription_name>";
static async Task Main()
{
await SendMessageAsync();
var serviceBusClient = new ServiceBusClient(ServiceBusConnectionString);
var sessionProcessor = serviceBusClient.CreateSessionProcessor(TopicName, SubscriptionName, new ServiceBusSessionProcessorOptions
{
MaxConcurrentSessions = 1
});
sessionProcessor.ProcessMessageAsync += ProcessMessagesAsync;
sessionProcessor.ProcessErrorAsync += ExceptionReceivedHandler;
try
{
Console.WriteLine("Starting session processor...");
await sessionProcessor.StartProcessingAsync();
Console.WriteLine("Press any key to exit...");
Console.ReadKey();
}
finally
{
Console.WriteLine("Closing session processor...");
await sessionProcessor.CloseAsync();
await serviceBusClient.DisposeAsync();
Console.WriteLine("Exiting application...");
}
}
static async Task SendMessageAsync()
{
var serviceBusClient = new ServiceBusClient(ServiceBusConnectionString);
var sender = serviceBusClient.CreateSender(TopicName);
try
{
for (int i = 1; i <= 5; i++)
{
var sessionId = $"session-{i}";
var messageBody = $"Message {i}";
var message = new ServiceBusMessage(messageBody)
{
SessionId = sessionId
};
await sender.SendMessageAsync(message);
Console.WriteLine($"Message sent with SessionId: {sessionId}");
}
}
finally
{
await sender.CloseAsync();
await serviceBusClient.DisposeAsync();
}
}
static async Task ProcessMessagesAsync(ProcessSessionMessageEventArgs args)
{
try
{
var session = args.SessionId;
var message = args.Message;
Console.WriteLine($"Received message: SessionId = {session}, SequenceNumber = {message.SequenceNumber}");
await Task.Delay(TimeSpan.FromSeconds(10));
await args.CompleteMessageAsync(message);
Console.WriteLine($"Completed message: SessionId = {session}, SequenceNumber = {message.SequenceNumber}");
}
catch (Exception ex)
{
Console.WriteLine($"Error processing message: {ex.Message}");
}
}
static Task ExceptionReceivedHandler(ProcessErrorEventArgs args)
{
Console.WriteLine($"Message handler encountered an exception: {args.Exception}");
return Task.CompletedTask;
}
}
运行成功如下。