我正在尝试在 Blazor Server Web 应用程序中实现消息接收器。
此操作的代码非常简单。
我的实现目标是加载该特定页面的每个客户端都应该接收消息,但实际上并不是这样工作的。
如果两个客户端正在加载同一页面,则其中一个正在收到消息。下次发送消息时,可能是另一个页面接收到。
有件事告诉我,这是因为注入的服务是单例服务,并且消息处理程序连接两次并没有帮助。
我不是这方面的专家,所以这将是我最好的猜测,但我希望有人可以指导我进行正确的实施,以获得所需的结果。
我尝试在范围内检索页面上的服务(使用 servicefactory 但这根本不起作用。
我已将这个 MSDOC 用于 .NET 开发人员的 Azure 服务总线客户端库。
、发送方、接收方和处理器可以安全地缓存并在应用程序的生命周期内作为单例使用,这是定期发送或接收消息时的最佳实践。他们负责有效管理网络、CPU 和内存使用,努力在不活动期间保持较低的使用率。ServiceBusClient
这些类型是一次性的,需要调用
或DisposeAsync
来确保正确清理网络资源和其他非托管对象。需要注意的是,当CloseAsync
实例被处置时,它将自动关闭并清理使用它创建的任何发送者、接收者和处理器。ServiceBusClient
<!-- Pages/ServiceBus.razor -->
@page "/servicebus"
@using Azure.Messaging.ServiceBus
<h3>Service Bus Processor</h3>
<button @onclick="StartProcessing">Start Processing</button>
@if (receivedMessages.Any())
{
<h4>Received Messages:</h4>
<ul>
@foreach (var message in receivedMessages)
{
<li>@message</li>
}
</ul>
}
@code {
private ServiceBusProcessorService processorService;
private ServiceBusProcessor processor;
private List<string> receivedMessages = new List<string>();
private async Task StartProcessing()
{
// Initialize your Service Bus
string connectionString = "";
string topicName = "sampathpujari";
string subscriptionName = "sampathpujari";
var retryOptions = new ServiceBusRetryOptions
{
Mode = ServiceBusRetryMode.Exponential,
MaxRetries = 3,
Delay = TimeSpan.FromSeconds(2),
MaxDelay = TimeSpan.FromSeconds(30)
};
var serviceBusClient = new ServiceBusClient(connectionString, new ServiceBusClientOptions
{
RetryOptions = retryOptions
});
processorService = new ServiceBusProcessorService(serviceBusClient);
processor = processorService.GetProcessorForTopicAndSubscription(topicName, subscriptionName);
// Set up message handler
processor.ProcessMessageAsync += ProcessMessages;
// Set up error handler
processor.ProcessErrorAsync += ErrorHandler;
// Start processing
await processor.StartProcessingAsync();
}
private async Task ProcessMessages(ProcessMessageEventArgs args)
{
// Your message handling logic
var body = args.Message.Body.ToString();
Console.WriteLine($"Received message: {body}");
// Update the UI with the received message
await InvokeAsync(() =>
{
receivedMessages.Add(body);
StateHasChanged();
});
// Complete the message
await args.CompleteMessageAsync(args.Message);
}
private Task ErrorHandler(ProcessErrorEventArgs args)
{
// Your error handling logic
Console.WriteLine($"Error source: {args.Exception.Source}, Exception: {args.Exception.Message}");
// Log the error or take appropriate action
return Task.CompletedTask;
}
}
// ServiceBusProcessorService.cs
using Azure.Messaging.ServiceBus;
public class ServiceBusProcessorService
{
private readonly ServiceBusClient _serviceBusClient;
public ServiceBusProcessorService(ServiceBusClient serviceBusClient)
{
_serviceBusClient = serviceBusClient;
}
public ServiceBusProcessor GetProcessorForTopicAndSubscription(string topicName, string subscriptionName)
{
return _serviceBusClient.CreateProcessor(topicName, subscriptionName);
}
}
<!-- Pages/Index.razor -->
@page "/"
<h1>Hello, world!</h1>
<a href="/servicebus">Go to Service Bus</a>