在 ASP.NET Core 应用程序中设置 RabbitMQ 使用者

问题描述 投票:0回答:4

我有一个 ASP.NET Core 应用程序,我想在其中使用 RabbitMQ 消息。

我已经在命令行应用程序中成功设置了发布者和消费者,但我不确定如何在 Web 应用程序中正确设置它。

我想在

Startup.cs
中初始化它,但是当然一旦启动完成它就会死掉。

如何从网络应用程序以正确的方式初始化消费者?

c# asp.net-core rabbitmq .net-core
4个回答
42
投票

对消费者/侦听器使用单例模式以在应用程序运行时保留它。使用

IApplicationLifetime
接口在应用程序启动/停止时启动/停止消费者。

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddSingleton<RabbitListener>();
    }


    public void Configure(IApplicationBuilder app)
    {
        app.UseRabbitListener();
    }
}

public static class ApplicationBuilderExtentions
{
    //the simplest way to store a single long-living object, just for example.
    private static RabbitListener _listener { get; set; }

    public static IApplicationBuilder UseRabbitListener(this IApplicationBuilder app)
    {
        _listener = app.ApplicationServices.GetService<RabbitListener>();

        var lifetime = app.ApplicationServices.GetService<IApplicationLifetime>();

        lifetime.ApplicationStarted.Register(OnStarted);

        //press Ctrl+C to reproduce if your app runs in Kestrel as a console app
        lifetime.ApplicationStopping.Register(OnStopping);

        return app;
    }

    private static void OnStarted()
    {
        _listener.Register();
    }

    private static void OnStopping()
    {
        _listener.Deregister();    
    }
}
  • 您应该注意您的应用程序的托管位置。例如,IIS 可以回收并阻止您的代码运行。
  • 此模式可以扩展到听众池。

17
投票

这是我的听众:

public class RabbitListener
{
    ConnectionFactory factory { get; set; }
    IConnection connection { get; set; }
    IModel channel { get; set; }

    public void Register()
    {
        channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body);
            int m = 0;
        };
        channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
    }

    public void Deregister()
    {
        this.connection.Close();
    }

    public RabbitListener()
    {
        this.factory = new ConnectionFactory() { HostName = "localhost" };
        this.connection = factory.CreateConnection();
        this.channel = connection.CreateModel();


    }
}

8
投票

另一个选项是托管服务

您可以创建一个 HostedService 并调用方法来注册

RabbitMq
监听器。

public interface IConsumerService
{
    Task ReadMessgaes();
}

public class ConsumerService : IConsumerService, IDisposable
{
    private readonly IModel _model;
    private readonly IConnection _connection;
    public ConsumerService(IRabbitMqService rabbitMqService)
    {
        _connection = rabbitMqService.CreateChannel();
        _model = _connection.CreateModel();
        _model.QueueDeclare(_queueName, durable: true, exclusive: false, autoDelete: false);
        _model.ExchangeDeclare("your.exchange.name", ExchangeType.Fanout, durable: true, autoDelete: false);
        _model.QueueBind(_queueName, "your.exchange.name", string.Empty);
    }
    const string _queueName = "your.queue.name";
    public async Task ReadMessgaes()
    {
        var consumer = new AsyncEventingBasicConsumer(_model);
        consumer.Received += async (ch, ea) =>
        {
            var body = ea.Body.ToArray();
            var text = System.Text.Encoding.UTF8.GetString(body);
            Console.WriteLine(text);
            await Task.CompletedTask;
            _model.BasicAck(ea.DeliveryTag, false);
        };
        _model.BasicConsume(_queueName, false, consumer);
        await Task.CompletedTask;
    }

    public void Dispose()
    {
        if (_model.IsOpen)
            _model.Close();
        if (_connection.IsOpen)
            _connection.Close();
    }
}

RabbitMqService

public interface IRabbitMqService
{
    IConnection CreateChannel();
}

public class RabbitMqService : IRabbitMqService
{
    private readonly RabbitMqConfiguration _configuration;
    public RabbitMqService(IOptions<RabbitMqConfiguration> options)
    {
        _configuration = options.Value;
    }
    public IConnection CreateChannel()
    {
        ConnectionFactory connection = new ConnectionFactory()
        {
            UserName = _configuration.Username,
            Password = _configuration.Password,
            HostName = _configuration.HostName
        };
        connection.DispatchConsumersAsync = true;
        var channel = connection.CreateConnection();
        return channel;
    }
}

最后创建一个 HostedService 并调用 ReadMessages 方法进行注册:

public class ConsumerHostedService : BackgroundService
{
    private readonly IConsumerService _consumerService;

    public ConsumerHostedService(IConsumerService consumerService)
    {
        _consumerService = consumerService;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await _consumerService.ReadMessgaes();
    }
}

注册服务:

services.AddSingleton<IRabbitMqService, RabbitMqService>();
services.AddSingleton<IConsumerService, ConsumerService>();
services.AddHostedService<ConsumerHostedService>();

在这种情况下,当应用程序停止时,您的消费者将自动停止。

附加信息:

appsettings.json

{
  "RabbitMqConfiguration": {
    "HostName": "localhost",
    "Username": "guest",
    "Password": "guest"
  }
}

RabbitMq配置

public class RabbitMqConfiguration
{
    public string HostName { get; set; }
    public string Username { get; set; }
    public string Password { get; set; }
}

参考


0
投票

我发现最好的方法之一是使用

BackgroundService

   public class TempConsumer : BackgroundService
{
    private readonly ConnectionFactory _factory;
    private IConnection _connection;
    private IModel _channel;

    public TempConsumer()
    {
        _factory = new ConnectionFactory()
        {
            HostName = "localhost",
            UserName = "guest",
            Password = "password",
            VirtualHost = "/",
        };
        _connection = _factory.CreateConnection() ;
        _channel = _connection.CreateModel();
        _channel.QueueDeclare(queue: "queue",
                                durable: false,
                                exclusive: false,
                                autoDelete: false,
                                arguments: null);
    }
  
    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        stoppingToken.ThrowIfCancellationRequested();
        
        var consumer = new EventingBasicConsumer(_channel);

        consumer.Shutdown += OnConsumerShutdown;
        consumer.Registered += OnConsumerRegistered;
        consumer.Unregistered += OnConsumerUnregistered;
        consumer.ConsumerCancelled += OnConsumerConsumerCancelled;


        consumer.Received += (model, ea) =>
        {
            Console.WriteLine("Recieved");
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body.ToArray());
            _channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            Console.WriteLine(message);
        };


        _channel.BasicConsume(queue: "queue",
                             autoAck: false,
                             consumer: consumer);

        return Task.CompletedTask;
    }

    private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e) { }
    private void OnConsumerUnregistered(object sender, ConsumerEventArgs e) { }
    private void OnConsumerRegistered(object sender, ConsumerEventArgs e) { }
    private void OnConsumerShutdown(object sender, ShutdownEventArgs e) { }
    private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) { }

然后,将消费者注册为托管服务

services.AddHostedService<EmailConsumer>();

© www.soinside.com 2019 - 2024. All rights reserved.