我创建了一个简单的 Windows 服务来使用来自 Azure 服务总线队列的消息。我使用
Topshelf
创建 Windows 服务。
下面的代码片段遵循 Microsoft Learn 中的示例:快速入门:从 Azure 服务总线队列 (.NET) 发送和接收消息。
var hf = HostFactory.New(x =>
{
x.Service<ServiceBusHelper>(s =>
{
s.ConstructUsing(serviceProvider.GetService<ServiceBusHelper>);
s.WhenStarted(async service => await service.ReceiveMessagesAsync());
s.WhenStopped(async service => await service.Stop());
});
x.RunAsNetworkService()
.StartAutomatically()
.EnableServiceRecovery(rc => rc.RestartService(1));
x.SetServiceName("MyWindowsService");
x.SetDisplayName("MyWindowsService");
x.SetDescription("MyWindowsService");
});
hf.Run();
ServiceBusHelper class:
public async Task ReceiveMessagesAsync()
{
var connectionString = _configuration.GetValue<string>("ServiceBusConnectionString");
var queueName = _configuration.GetValue<string>("ServiceBusQueueName");
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
ServiceBusProcessor processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions());
processor.ProcessMessageAsync += MessageHandler;
processor.ProcessErrorAsync += ErrorHandler;
await processor.StartProcessingAsync();
System.Threading.Thread.Sleep(1000);//Wait for a minute before stop processing
await processor.StopProcessingAsync();
}
}
public async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = args.Message.Body.ToString();
var messageBytes = Encoding.ASCII.GetBytes(body);
ProcessMessage(messageBytes);
await args.CompleteMessageAsync(args.Message);
}
public Task ErrorHandler(ProcessErrorEventArgs args)
{
return Task.CompletedTask;
}
public Task Stop()
{
return Task.CompletedTask;
}
Window服务安装成功,状态显示正在运行。但是,它不会自动消耗来自服务总线的消息。
如果我手动停止并启动服务,它将从队列中获取消息。
不确定这个实现中我缺少什么。
.NetCore 3.1 引入了一个新的扩展,可以与 Microsoft.AspNetCore.Hosting 一起工作 添加 NuGet 包 Microsoft.Extensions.Hosting.WindowsServices 你可以加 .UseWindowsService()。这将允许您将其作为 Windows 服务或控制台应用程序运行。
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.UseWindowsService()
.ConfigureAppConfiguration((context, config) =>
{
// configure the app here.
})
.ConfigureServices((hostContext, services) =>
{
services.AddHostedService<QueueWorker>();
}).UseSerilog();
}
然后您可以创建一个后台工作程序来启动和停止处理服务总线队列。这是我的实现:
public class QueueWorker : BackgroundService, IDisposable
{
protected ILogger<QueueWorker> _logger;
protected IQueueMessageReceiver _queueProcessor;
public QueueWorker()
{
}
public QueueWorker(ILogger<QueueWorker> logger, IQueueMessageReceiver queueMessageReceiver)
{
_logger = logger;
_queueProcessor = queueMessageReceiver;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.CompletedTask.ConfigureAwait(false);
}
public override Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Service Starting");
var task = _queueProcessor.StartProcessor(cancellationToken);
task.Wait();
if (task.IsFaulted)
{
throw new Exception("Unable to start Processor");
}
return base.StartAsync(cancellationToken);
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Stopping Service");
await _queueProcessor.StopProcessor().ConfigureAwait(false);
await base.StopAsync(cancellationToken).ConfigureAwait(false);
}
public override void Dispose()
{
_logger.LogInformation("Disposing Service");
var loopCount = 0;
while (_queueProcessor != null && !_queueProcessor.IsClosedOrClosing() && loopCount < 5)
{
var task = Task.Delay(600);
task.Wait();
loopCount++;
}
base.Dispose();
GC.SuppressFinalize(this);
}
实际处理器:
public class QueueMessageReceiver : IQueueMessageReceiver
{
private readonly ServiceBusClient _queueClient;
private ServiceBusProcessor _processor;
private readonly ReceiverConfiguration _configuration;
private readonly ILogger _logger;
private readonly ILoggerFactory _loggerFactory;
private Dictionary<string, string> _executionMatrix;
private readonly IServiceProvider _provider;
private CancellationToken _cancellationToken;
public QueueMessageReceiver(ReceiverConfiguration configuration, ILogger<QueueMessageReceiver> logger, IExecutionMatrix executionMatrix, ILoggerFactory loggerFactory, IServiceProvider serviceProvider)
{
if (configuration == null) throw new ArgumentException($"Configuration is missing from the expected ");
_configuration = configuration;
_logger = logger;
_loggerFactory = loggerFactory;
_executionMatrix = executionMatrix.GetExecutionMatrix();
_provider = serviceProvider;
_queueClient = new ServiceBusClient(_configuration.ConnectionString);
if (string.IsNullOrWhiteSpace(configuration.ConnectionString)) throw new ArgumentException($"ServiceBusConnectionString Object missing from the expected configuration under ConnectionStrings ");
if (configuration.QueueName == null) throw new ArgumentException($"Queue Name value missing from the expected configuration");
}
public async Task StartProcessor(CancellationToken cancellationToken)
{
if (!IsClosedOrClosing())
{
throw new FatalSystemException("ServiceBusProcessor is already running. ");
}
_cancellationToken = cancellationToken;
var options = new ServiceBusProcessorOptions
{
AutoCompleteMessages = _configuration.AutoComplete,
MaxConcurrentCalls = _configuration.MaxConcurrentCalls,
MaxAutoLockRenewalDuration = _configuration.MaxAutoRenewDuration
};
_processor = _queueClient.CreateProcessor(_configuration.QueueName, options);
_processor.ProcessMessageAsync += ProcessMessagesAsync;
_processor.ProcessErrorAsync += ProcessErrorAsync;
await _processor.StartProcessingAsync().ConfigureAwait(false);
}
public async Task StopProcessor()
{
await _processor.StopProcessingAsync();
await _processor.CloseAsync();
}
private Task ProcessErrorAsync(ProcessErrorEventArgs args)
{
_logger.LogError(args.Exception, "Uncaught handled exception", args.ErrorSource, args.FullyQualifiedNamespace, args.EntityPath);
return Task.CompletedTask;
}
private async Task ProcessMessagesAsync(ProcessMessageEventArgs args)
{
var message = args.Message;
// Process the message.
var sbMessage = $"Received message: SequenceNumber:{message.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}";
_logger.LogInformation(sbMessage);
//Handle your message
}
public bool IsClosedOrClosing()
{
return ((_processor == null) || _processor.IsClosed || !_processor.IsProcessing);
}
}