我有下面的代码用于从服务总线队列读取x消息,但感觉有点笨拙,特别是在检测队列上是否没有更多数据时
有人能够以更好的方式完成这项工作吗?
static async Task<List<string>> ReceiveMessagesAsync(int messageCount)
{
var messsages = new List<string>();
queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
try
{
var messages = new List<Message>();
var options = new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 1
};
var startingAt = DateTime.Now;
DateTime? latestMessageReadAt = null;
queueClient.RegisterMessageHandler((message, cancellationToken) =>
{
var json = Encoding.UTF8.GetString(message.Body);
messages.Add(json);
latestMessageReadAt = DateTime.Now;
return Task.CompletedTask;
}, options);
var allMessagesRead = false;
// Wait for the desired number of messages to be received
while (messages.Count < messageCount && allMessagesRead == false)
{
await Task.Delay(10); // Adjust the delay based on your requirements
if (latestMessageReadAt != null)
{
allMessagesRead = DateTime.Now.Subtract(latestMessageReadAt.Value).Duration().Seconds > 10;
}
else
{
allMessagesRead = DateTime.Now.Subtract(startingAt).Duration().Seconds > 10;
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
finally
{
await queueClient.CloseAsync();
}
return messages;
}
static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine($"Message handler encountered an exception: {exceptionReceivedEventArgs.Exception}");
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
Console.WriteLine($"Exception context for troubleshooting:");
Console.WriteLine($"- Endpoint: {context.Endpoint}");
Console.WriteLine($"- Entity Path: {context.EntityPath}");
Console.WriteLine($"- Executing Action: {context.Action}");
return Task.CompletedTask;
}
保罗
我同意并谢谢 Panagiotis Kanavas,
.Net 3.1
已不再支持和利用 .Net 6.0
服务总线 SDK 现在拥有长期支持。
为了检测收到的消息数量并关闭接收器,请参考下面的代码。此代码在
.Net 3.1
以及 .Net 6.0
控制台应用程序中正常工作:-
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Management;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
class Program
{
const string ServiceBusConnectionString = "Endpoint=sb://xxxsb65.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxbOm80x4=";
const string QueueName = "myqueue";
static async Task Main(string[] args)
{
await ReceiveAndProcessMessagesAsync(2); // Replace '2' with the number of messages you want to receive
}
static async Task ReceiveAndProcessMessagesAsync(int messageCount)
{
var messages = new List<string>();
var queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
var managementClient = new ManagementClient(ServiceBusConnectionString);
try
{
queueClient.RegisterMessageHandler(async (message, cancellationToken) =>
{
var json = Encoding.UTF8.GetString(message.Body);
messages.Add(json);
Console.WriteLine($"Received message: {json}");
if (messages.Count >= messageCount)
{
await queueClient.CloseAsync();
}
},
new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 1,
AutoComplete = false
});
while (true)
{
// Add a short delay before re-checking the message count
await Task.Delay(1000); // Adjust the delay based on your requirements
// Check if the desired number of messages have been received
if (messages.Count >= messageCount)
{
Console.WriteLine("Received the required number of messages.");
break;
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
finally
{
await queueClient.CloseAsync();
}
}
static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine($"Message handler encountered an exception: {exceptionReceivedEventArgs.Exception}");
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
Console.WriteLine($"Exception context for troubleshooting:");
Console.WriteLine($"- Endpoint: {context.Endpoint}");
Console.WriteLine($"- Entity Path: {context.EntityPath}");
Console.WriteLine($"- Executing Action: {context.Action}");
return Task.CompletedTask;
}
}
输出:-
我添加了
await ReceiveAndProcessMessagesAsync(2);
以仅接收来自服务总线的 2 条消息。
根据 Panagiotis Kanavas 的评论,您可以使用以下代码实现批处理:-
static async Task ReceiveAndProcessMessagesAsync(int messageCount)
{
var messages = new List<string>();
await using var client = new ServiceBusClient(ServiceBusConnectionString);
var receiver = client.CreateReceiver(QueueName);
try
{
while (true)
{
// Receive a batch of messages (maximum of 'messageCount' in this case)
IReadOnlyList<ServiceBusReceivedMessage> receivedMessages = await receiver.ReceiveMessagesAsync(messageCount);
if (receivedMessages.Count == 0)
{
Console.WriteLine("No more messages on the queue.");
break;
}
foreach (ServiceBusReceivedMessage receivedMessage in receivedMessages)
{
string body = Encoding.UTF8.GetString(receivedMessage.Body);
messages.Add(body);
Console.WriteLine($"Received message: {body}");
// Complete the message if needed
// await receiver.CompleteMessageAsync(receivedMessage);
if (messages.Count >= messageCount)
{
break; // Exit the loop if the desired message count is reached
}
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
finally
{
await receiver.CloseAsync();
}
}
}