处理RabbitMQ错误队列的最理想方式

问题描述 投票:0回答:1

我的应用程序有一个用于处理通知的.net core微服务,并且它已部署在Kubernetes上。其中NotificationRequestConsumer如下,(请注意,这只是一个代码片段来阐述我的问题)

public class NotificationRequestConsumer : IConsumer<INotificationRequest>
{
    public NotificationRequestConsumer()
    {
        
    }
    public Task Consume(ConsumeContext<INotificationRequest> context)
    {
        // notification request logic goes here
        return Task.CompletedTask;
    }
}

如何在启动时配置大众交通。

public static IServiceCollection AddMassTransitConnection(this IServiceCollection services, IConfiguration configuration)
{
    services.AddMassTransit(x =>
    {
        x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(c =>
        {
            c.Host(configuration["RabbitMQ:HostUrl"]);
            c.ConfigureEndpoints(context);
        }));
        
        x.AddConsumer<NotificationRequestConsumer>(c => c.UseMessageRetry(r => r.Interval(1,500)));
    });

    services.AddMassTransitHostedService();

    return services;
}

根据上面的代码,我设置了几毫秒的间隔,以便在警报处理时发生任何错误时重试。如果出现问题,我使用故障消费者将相关请求的数据存储在数据库中,以供将来使用(以便将来手动发送相关通知)。

public class NotificationRequestFaultConsumer : IConsumer<Fault<INotificationRequest>>
{
    public Task Consume(ConsumeContext<Fault<INotificationRequest>> context)
    {
        //For future use, I store the relevant data here 
        return Task.CompletedTask;
    }
}

即使我这样做了,相关的异常也会被添加到RabbitMQ错误队列中。据我所知,这是交通运作的一部分。

我的担忧如下,

  1. 错误队列不断增长会导致集群崩溃吗?
  2. 只记录到 ELK Stack 而不抛出异常并且不将它们添加到 RabbitMQ 错误队列中是一个好方法吗?
  3. 是否可以给出特定的过期标准来自动删除错误队列,这是一个好主意吗?
kubernetes .net-core rabbitmq masstransit
1个回答
1
投票

您可以使用

dead-letter-queues
,这是rabbitMQ内置的机制,用于在这些情况下处理消息参考官方文档

  1. 消费者使用 basic.reject 或 basic.nack 否定确认消息,并将 requeue 参数设置为 false。
  2. 由于每条消息的 TTL,消息过期;或
  3. 消息因队列超出长度限制而被丢弃
© www.soinside.com 2019 - 2024. All rights reserved.