我正在尝试并行处理来自启用了会话的队列的多条消息。例如,我尝试将 MaxConcurrentCallsPerSession 设置为 5,但我仍然一次收到 1 条消息。
我编写了一个控制台应用程序来演示我想要做什么:
static void Main()
{
MainAsync().Wait();
}
static async Task MainAsync()
{
//create the queue
await CreateQueue();
//initialize queue client
ServiceBusClient queueClient = new ServiceBusClient(_serviceBusConnectionString, new ServiceBusClientOptions
{
TransportType = ServiceBusTransportType.AmqpWebSockets,
});
//initialize the sender
ServiceBusSender sender = queueClient.CreateSender(_queueName);
//queue 3 messages
await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = _sessionId, MessageId = "1" });
await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = _sessionId, MessageId = "2" });
await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = _sessionId, MessageId = "3" });
//initialize processor
ServiceBusSessionProcessor processor = queueClient.CreateSessionProcessor(_queueName, new ServiceBusSessionProcessorOptions()
{
AutoCompleteMessages = false,
ReceiveMode = ServiceBusReceiveMode.PeekLock,
SessionIds = { _sessionId },
PrefetchCount = 5,
MaxConcurrentCallsPerSession = 5
});
//add message handler
processor.ProcessMessageAsync += HandleReceivedMessage;
//add error handler
processor.ProcessErrorAsync += ErrorHandler;
//start the processor
await processor.StartProcessingAsync();
Console.ReadLine();
}
static async Task CreateQueue()
{
ServiceBusAdministrationClient client = new ServiceBusAdministrationClient(_serviceBusConnectionString);
bool doesQueueExist = await client.QueueExistsAsync(_queueName);
//check if the queue exists, if not then create one
if (!doesQueueExist)
{
_ = await client.CreateQueueAsync(new CreateQueueOptions(_queueName)
{
RequiresSession = true,
DeadLetteringOnMessageExpiration = true,
MaxDeliveryCount = 3,
EnableBatchedOperations = true,
});
}
}
static async Task HandleReceivedMessage(ProcessSessionMessageEventArgs sessionMessage)
{
Console.WriteLine("Received message: " + sessionMessage.Message.MessageId);
await Task.Delay(5000).ConfigureAwait(false);
await sessionMessage.CompleteMessageAsync(sessionMessage.Message);
Console.WriteLine("Completed message: " + sessionMessage.Message.MessageId);
}
static Task ErrorHandler(ProcessErrorEventArgs e)
{
Console.WriteLine("Error received");
return Task.CompletedTask;
}
执行程序时,我期望收到的是:
Received message: 1
Received message: 2
Received message: 3
Completed message: 1
Completed message: 2
Completed message: 3
但是我得到的是:
Received message: 1
Completed message: 1
Received message: 2
Completed message: 2
Received message: 3
Completed message: 3
我想要实现的目标可能吗?
我正在使用.NetFramework 4.7.2和Azure.Messaging.ServiceBus 7.17.4
我尝试使用.Net Framework 4.7.2和Azure.Messaging.ServiceBus 7.1.7.4包使用下面的代码来从Azure服务总线队列接收消息。
代码:
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace ConsoleApp75
{
class Program
{
private const string _serviceBusConnectionString = "<ServiceBus_QueueConne_String>";
private const string _queueName = "<queue_name>";
static async Task MainAsync()
{
await CreateQueue();
ServiceBusClient queueClient = new ServiceBusClient(_serviceBusConnectionString, new ServiceBusClientOptions
{
TransportType = ServiceBusTransportType.AmqpWebSockets,
});
ServiceBusSender sender = queueClient.CreateSender(_queueName);
var processors = new List<ServiceBusSessionProcessor>();
try
{
var processorTasks = new List<Task>();
for (int i = 0; i < 3; i++)
{
ServiceBusSessionProcessor processor = queueClient.CreateSessionProcessor(_queueName, new ServiceBusSessionProcessorOptions()
{
AutoCompleteMessages = false,
ReceiveMode = ServiceBusReceiveMode.PeekLock,
SessionIds = { (i + 1).ToString() },
PrefetchCount = 5,
MaxConcurrentCallsPerSession = 1
});
processor.ProcessMessageAsync += async args =>
{
await HandleReceivedMessage(args);
};
processor.ProcessErrorAsync += ErrorHandler;
processors.Add(processor);
processorTasks.Add(processor.StartProcessingAsync());
}
await Task.WhenAll(processorTasks);
await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = "1", MessageId = "1" });
await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = "2", MessageId = "2" });
await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = "3", MessageId = "3" });
Console.ReadLine();
}
finally
{
foreach (var processor in processors)
{
await processor.StopProcessingAsync();
}
}
}
static async Task CreateQueue()
{
ServiceBusAdministrationClient client = new ServiceBusAdministrationClient(_serviceBusConnectionString);
bool doesQueueExist = await client.QueueExistsAsync(_queueName);
if (!doesQueueExist)
{
_ = await client.CreateQueueAsync(new CreateQueueOptions(_queueName)
{
RequiresSession = true,
DeadLetteringOnMessageExpiration = true,
MaxDeliveryCount = 3,
EnableBatchedOperations = true,
});
}
}
static async Task HandleReceivedMessage(ProcessSessionMessageEventArgs sessionMessage)
{
Console.WriteLine("Received message: " + sessionMessage.Message.MessageId);
await Task.Delay(5000).ConfigureAwait(false);
await sessionMessage.CompleteMessageAsync(sessionMessage.Message);
Console.WriteLine("Completed message: " + sessionMessage.Message.MessageId);
}
static Task ErrorHandler(ProcessErrorEventArgs e)
{
Console.WriteLine($"Error received. Error source: {e.ErrorSource}, Exception: {e.Exception}");
return Task.CompletedTask;
}
static void Main(string[] args)
{
MainAsync().Wait();
}
}
}
输出:
以下代码成功运行,我在输出中收到了确切的消息,如下所示。
Received message: 1
Received message: 2
Received message: 3
Completed message: 1
Completed message: 2
Completed message: 3