使用 .NET Core 3.1 从服务总线队列读取 x 消息

问题描述 投票:0回答:1

我有下面的代码用于从服务总线队列读取x消息,但感觉有点笨拙,特别是在检测队列上是否没有更多数据时

有人能够以更好的方式完成这项工作吗?

static async Task<List<string>> ReceiveMessagesAsync(int messageCount)    
{
    var messsages = new List<string>();
    queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
    try
    {
        var messages = new List<Message>();
        var options = new MessageHandlerOptions(ExceptionReceivedHandler)
        {
            MaxConcurrentCalls = 1
        };

        var startingAt = DateTime.Now;
        DateTime? latestMessageReadAt = null;
    
        queueClient.RegisterMessageHandler((message, cancellationToken) =>
        {
            var json = Encoding.UTF8.GetString(message.Body);
            messages.Add(json);

            latestMessageReadAt = DateTime.Now;
            return Task.CompletedTask;
        }, options);

        var allMessagesRead = false;
    
        // Wait for the desired number of messages to be received
        while (messages.Count < messageCount && allMessagesRead == false)
        {
            await Task.Delay(10); // Adjust the delay based on your requirements
        
            if (latestMessageReadAt != null)
            {
                allMessagesRead = DateTime.Now.Subtract(latestMessageReadAt.Value).Duration().Seconds > 10;
            }
            else
            {
                allMessagesRead = DateTime.Now.Subtract(startingAt).Duration().Seconds > 10;
            }
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    } 
    finally
    {
        await queueClient.CloseAsync();
    }

    return messages;
}

static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
    Console.WriteLine($"Message handler encountered an exception: {exceptionReceivedEventArgs.Exception}");
    var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
    Console.WriteLine($"Exception context for troubleshooting:");
    Console.WriteLine($"- Endpoint: {context.Endpoint}");
    Console.WriteLine($"- Entity Path: {context.EntityPath}");
    Console.WriteLine($"- Executing Action: {context.Action}");
    return Task.CompletedTask;
}

保罗

azure azure-servicebus-queues
1个回答
0
投票

我同意并谢谢 Panagiotis Kanavas,

.Net 3.1
已不再支持和利用
.Net 6.0
服务总线 SDK 现在拥有长期支持。

为了检测收到的消息数量并关闭接收器,请参考下面的代码。此代码在

.Net 3.1
以及
.Net 6.0
控制台应用程序中正常工作:-

using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Management;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

class Program
{
    const string ServiceBusConnectionString = "Endpoint=sb://xxxsb65.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxbOm80x4=";
    const string QueueName = "myqueue";

    static async Task Main(string[] args)
    {
        await ReceiveAndProcessMessagesAsync(2); // Replace '2' with the number of messages you want to receive
    }

    static async Task ReceiveAndProcessMessagesAsync(int messageCount)
    {
        var messages = new List<string>();
        var queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
        var managementClient = new ManagementClient(ServiceBusConnectionString);

        try
        {
            queueClient.RegisterMessageHandler(async (message, cancellationToken) =>
            {
                var json = Encoding.UTF8.GetString(message.Body);
                messages.Add(json);

                Console.WriteLine($"Received message: {json}");

                if (messages.Count >= messageCount)
                {
                    await queueClient.CloseAsync();
                }
            },
            new MessageHandlerOptions(ExceptionReceivedHandler)
            {
                MaxConcurrentCalls = 1,
                AutoComplete = false
            });

            while (true)
            {
                // Add a short delay before re-checking the message count
                await Task.Delay(1000); // Adjust the delay based on your requirements

                // Check if the desired number of messages have been received
                if (messages.Count >= messageCount)
                {
                    Console.WriteLine("Received the required number of messages.");
                    break;
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
        finally
        {
            await queueClient.CloseAsync();
        }
    }

    static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    {
        Console.WriteLine($"Message handler encountered an exception: {exceptionReceivedEventArgs.Exception}");
        var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
        Console.WriteLine($"Exception context for troubleshooting:");
        Console.WriteLine($"- Endpoint: {context.Endpoint}");
        Console.WriteLine($"- Entity Path: {context.EntityPath}");
        Console.WriteLine($"- Executing Action: {context.Action}");
        return Task.CompletedTask;
    }
}

输出:-

我添加了

await ReceiveAndProcessMessagesAsync(2);
以仅接收来自服务总线的 2 条消息。

enter image description here

根据 Panagiotis Kanavas 的评论,您可以使用以下代码实现批处理:-

    static async Task ReceiveAndProcessMessagesAsync(int messageCount)
    {
        var messages = new List<string>();
        await using var client = new ServiceBusClient(ServiceBusConnectionString);
        var receiver = client.CreateReceiver(QueueName);

        try
        {
            while (true)
            {
                // Receive a batch of messages (maximum of 'messageCount' in this case)
                IReadOnlyList<ServiceBusReceivedMessage> receivedMessages = await receiver.ReceiveMessagesAsync(messageCount);

                if (receivedMessages.Count == 0)
                {
                    Console.WriteLine("No more messages on the queue.");
                    break;
                }

                foreach (ServiceBusReceivedMessage receivedMessage in receivedMessages)
                {
                    string body = Encoding.UTF8.GetString(receivedMessage.Body);
                    messages.Add(body);
                    Console.WriteLine($"Received message: {body}");

                    // Complete the message if needed
                    // await receiver.CompleteMessageAsync(receivedMessage);

                    if (messages.Count >= messageCount)
                    {
                        break; // Exit the loop if the desired message count is reached
                    }
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
        finally
        {
            await receiver.CloseAsync();
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.