Azure 服务总线 - 并行处理多个消息 (MaxConcurrentCallsPerSession)

问题描述 投票:0回答:1

我正在尝试并行处理来自启用了会话的队列的多条消息。例如,我尝试将 MaxConcurrentCallsPerSession 设置为 5,但我仍然一次收到 1 条消息。

我编写了一个控制台应用程序来演示我想要做什么:

static void Main()
        {
            MainAsync().Wait();
        }

        static async Task MainAsync()
        {
            //create the queue
            await CreateQueue();

            //initialize queue client
            ServiceBusClient queueClient = new ServiceBusClient(_serviceBusConnectionString, new ServiceBusClientOptions
            {
                TransportType = ServiceBusTransportType.AmqpWebSockets,
            });

            //initialize the sender
            ServiceBusSender sender = queueClient.CreateSender(_queueName);

            //queue 3 messages
            await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = _sessionId, MessageId = "1" });
            await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = _sessionId, MessageId = "2" });
            await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = _sessionId, MessageId = "3" });

            //initialize processor
            ServiceBusSessionProcessor processor = queueClient.CreateSessionProcessor(_queueName, new ServiceBusSessionProcessorOptions()
            {
                AutoCompleteMessages = false,
                ReceiveMode = ServiceBusReceiveMode.PeekLock,
                SessionIds = { _sessionId },
                PrefetchCount = 5,
                MaxConcurrentCallsPerSession = 5
            });

            //add message handler
            processor.ProcessMessageAsync += HandleReceivedMessage;

            //add error handler
            processor.ProcessErrorAsync += ErrorHandler;

            //start the processor
            await processor.StartProcessingAsync();

            Console.ReadLine();
        }

        static async Task CreateQueue()
        {
            ServiceBusAdministrationClient client = new ServiceBusAdministrationClient(_serviceBusConnectionString);

            bool doesQueueExist = await client.QueueExistsAsync(_queueName);

            //check if the queue exists, if not then create one
            if (!doesQueueExist)
            {
                _ = await client.CreateQueueAsync(new CreateQueueOptions(_queueName)
                {
                    RequiresSession = true,
                    DeadLetteringOnMessageExpiration = true,
                    MaxDeliveryCount = 3,
                    EnableBatchedOperations = true,
                });
            }
        }

        static async Task HandleReceivedMessage(ProcessSessionMessageEventArgs sessionMessage)
        {
            Console.WriteLine("Received message: " + sessionMessage.Message.MessageId);

            await Task.Delay(5000).ConfigureAwait(false);

            await sessionMessage.CompleteMessageAsync(sessionMessage.Message);

            Console.WriteLine("Completed message: " + sessionMessage.Message.MessageId);
        }

        static Task ErrorHandler(ProcessErrorEventArgs e)
        {
            Console.WriteLine("Error received");

            return Task.CompletedTask;
        }

执行程序时,我期望收到的是:

Received message: 1
Received message: 2
Received message: 3
Completed message: 1
Completed message: 2
Completed message: 3

但是我得到的是:

Received message: 1
Completed message: 1
Received message: 2
Completed message: 2
Received message: 3
Completed message: 3

我想要实现的目标可能吗?

我正在使用.NetFramework 4.7.2和Azure.Messaging.ServiceBus 7.17.4

azure azureservicebus azure-servicebus-queues
1个回答
0
投票

我尝试使用.Net Framework 4.7.2和Azure.Messaging.ServiceBus 7.1.7.4包使用下面的代码来从Azure服务总线队列接收消息。

代码:

using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ConsoleApp75
{
    class Program
    {
        private const string _serviceBusConnectionString = "<ServiceBus_QueueConne_String>";
        private const string _queueName = "<queue_name>";

        static async Task MainAsync()
        {
            await CreateQueue();
            ServiceBusClient queueClient = new ServiceBusClient(_serviceBusConnectionString, new ServiceBusClientOptions
            {
                TransportType = ServiceBusTransportType.AmqpWebSockets,
            });

            ServiceBusSender sender = queueClient.CreateSender(_queueName);
            var processors = new List<ServiceBusSessionProcessor>();

            try
            {
                var processorTasks = new List<Task>();
                for (int i = 0; i < 3; i++)
                {
                    ServiceBusSessionProcessor processor = queueClient.CreateSessionProcessor(_queueName, new ServiceBusSessionProcessorOptions()
                    {
                        AutoCompleteMessages = false,
                        ReceiveMode = ServiceBusReceiveMode.PeekLock,
                        SessionIds = { (i + 1).ToString() }, 
                        PrefetchCount = 5,
                        MaxConcurrentCallsPerSession = 1 
                    });

                    processor.ProcessMessageAsync += async args =>
                    {
                        await HandleReceivedMessage(args);
                    };
                    processor.ProcessErrorAsync += ErrorHandler;
                    processors.Add(processor);
                    processorTasks.Add(processor.StartProcessingAsync());
                }
                await Task.WhenAll(processorTasks);

                await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = "1", MessageId = "1" });
                await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = "2", MessageId = "2" });
                await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = "3", MessageId = "3" });

                Console.ReadLine();
            }
            finally
            {
                foreach (var processor in processors)
                {
                    await processor.StopProcessingAsync();
                }
            }
        }

        static async Task CreateQueue()
        {
            ServiceBusAdministrationClient client = new ServiceBusAdministrationClient(_serviceBusConnectionString);
            bool doesQueueExist = await client.QueueExistsAsync(_queueName);
            if (!doesQueueExist)
            {
                _ = await client.CreateQueueAsync(new CreateQueueOptions(_queueName)
                {
                    RequiresSession = true, 
                    DeadLetteringOnMessageExpiration = true,
                    MaxDeliveryCount = 3,
                    EnableBatchedOperations = true,
                });
            }
        }

        static async Task HandleReceivedMessage(ProcessSessionMessageEventArgs sessionMessage)
        {
            Console.WriteLine("Received message: " + sessionMessage.Message.MessageId);
            await Task.Delay(5000).ConfigureAwait(false);
            await sessionMessage.CompleteMessageAsync(sessionMessage.Message);
            Console.WriteLine("Completed message: " + sessionMessage.Message.MessageId);
        }

        static Task ErrorHandler(ProcessErrorEventArgs e)
        {
            Console.WriteLine($"Error received. Error source: {e.ErrorSource}, Exception: {e.Exception}");
            return Task.CompletedTask;
        }

        static void Main(string[] args)
        {
            MainAsync().Wait();
        }
    }
}

输出:

以下代码成功运行,我在输出中收到了确切的消息,如下所示。

enter image description here

Received message: 1
Received message: 2
Received message: 3
Completed message: 1
Completed message: 2
Completed message: 3
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.