我有一个 ASP.NET Core 应用程序,我想在其中使用 RabbitMQ 消息。
我已经在命令行应用程序中成功设置了发布者和消费者,但我不确定如何在 Web 应用程序中正确设置它。
我想在
Startup.cs
中初始化它,但是当然一旦启动完成它就会死掉。
如何从网络应用程序以正确的方式初始化消费者?
对消费者/侦听器使用单例模式以在应用程序运行时保留它。使用
IApplicationLifetime
接口在应用程序启动/停止时启动/停止消费者。
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<RabbitListener>();
}
public void Configure(IApplicationBuilder app)
{
app.UseRabbitListener();
}
}
public static class ApplicationBuilderExtentions
{
//the simplest way to store a single long-living object, just for example.
private static RabbitListener _listener { get; set; }
public static IApplicationBuilder UseRabbitListener(this IApplicationBuilder app)
{
_listener = app.ApplicationServices.GetService<RabbitListener>();
var lifetime = app.ApplicationServices.GetService<IApplicationLifetime>();
lifetime.ApplicationStarted.Register(OnStarted);
//press Ctrl+C to reproduce if your app runs in Kestrel as a console app
lifetime.ApplicationStopping.Register(OnStopping);
return app;
}
private static void OnStarted()
{
_listener.Register();
}
private static void OnStopping()
{
_listener.Deregister();
}
}
这是我的听众:
public class RabbitListener
{
ConnectionFactory factory { get; set; }
IConnection connection { get; set; }
IModel channel { get; set; }
public void Register()
{
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
int m = 0;
};
channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
}
public void Deregister()
{
this.connection.Close();
}
public RabbitListener()
{
this.factory = new ConnectionFactory() { HostName = "localhost" };
this.connection = factory.CreateConnection();
this.channel = connection.CreateModel();
}
}
另一个选项是托管服务。
您可以创建一个 HostedService 并调用方法来注册
RabbitMq
监听器。
public interface IConsumerService
{
Task ReadMessgaes();
}
public class ConsumerService : IConsumerService, IDisposable
{
private readonly IModel _model;
private readonly IConnection _connection;
public ConsumerService(IRabbitMqService rabbitMqService)
{
_connection = rabbitMqService.CreateChannel();
_model = _connection.CreateModel();
_model.QueueDeclare(_queueName, durable: true, exclusive: false, autoDelete: false);
_model.ExchangeDeclare("your.exchange.name", ExchangeType.Fanout, durable: true, autoDelete: false);
_model.QueueBind(_queueName, "your.exchange.name", string.Empty);
}
const string _queueName = "your.queue.name";
public async Task ReadMessgaes()
{
var consumer = new AsyncEventingBasicConsumer(_model);
consumer.Received += async (ch, ea) =>
{
var body = ea.Body.ToArray();
var text = System.Text.Encoding.UTF8.GetString(body);
Console.WriteLine(text);
await Task.CompletedTask;
_model.BasicAck(ea.DeliveryTag, false);
};
_model.BasicConsume(_queueName, false, consumer);
await Task.CompletedTask;
}
public void Dispose()
{
if (_model.IsOpen)
_model.Close();
if (_connection.IsOpen)
_connection.Close();
}
}
RabbitMqService:
public interface IRabbitMqService
{
IConnection CreateChannel();
}
public class RabbitMqService : IRabbitMqService
{
private readonly RabbitMqConfiguration _configuration;
public RabbitMqService(IOptions<RabbitMqConfiguration> options)
{
_configuration = options.Value;
}
public IConnection CreateChannel()
{
ConnectionFactory connection = new ConnectionFactory()
{
UserName = _configuration.Username,
Password = _configuration.Password,
HostName = _configuration.HostName
};
connection.DispatchConsumersAsync = true;
var channel = connection.CreateConnection();
return channel;
}
}
最后创建一个 HostedService 并调用 ReadMessages 方法进行注册:
public class ConsumerHostedService : BackgroundService
{
private readonly IConsumerService _consumerService;
public ConsumerHostedService(IConsumerService consumerService)
{
_consumerService = consumerService;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _consumerService.ReadMessgaes();
}
}
注册服务:
services.AddSingleton<IRabbitMqService, RabbitMqService>();
services.AddSingleton<IConsumerService, ConsumerService>();
services.AddHostedService<ConsumerHostedService>();
在这种情况下,当应用程序停止时,您的消费者将自动停止。
附加信息:
appsettings.json:
{
"RabbitMqConfiguration": {
"HostName": "localhost",
"Username": "guest",
"Password": "guest"
}
}
RabbitMq配置
public class RabbitMqConfiguration
{
public string HostName { get; set; }
public string Username { get; set; }
public string Password { get; set; }
}
我发现最好的方法之一是使用
BackgroundService
public class TempConsumer : BackgroundService
{
private readonly ConnectionFactory _factory;
private IConnection _connection;
private IModel _channel;
public TempConsumer()
{
_factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "guest",
Password = "password",
VirtualHost = "/",
};
_connection = _factory.CreateConnection() ;
_channel = _connection.CreateModel();
_channel.QueueDeclare(queue: "queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
var consumer = new EventingBasicConsumer(_channel);
consumer.Shutdown += OnConsumerShutdown;
consumer.Registered += OnConsumerRegistered;
consumer.Unregistered += OnConsumerUnregistered;
consumer.ConsumerCancelled += OnConsumerConsumerCancelled;
consumer.Received += (model, ea) =>
{
Console.WriteLine("Recieved");
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
Console.WriteLine(message);
};
_channel.BasicConsume(queue: "queue",
autoAck: false,
consumer: consumer);
return Task.CompletedTask;
}
private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e) { }
private void OnConsumerUnregistered(object sender, ConsumerEventArgs e) { }
private void OnConsumerRegistered(object sender, ConsumerEventArgs e) { }
private void OnConsumerShutdown(object sender, ShutdownEventArgs e) { }
private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) { }
然后,将消费者注册为托管服务
services.AddHostedService<EmailConsumer>();