如何为通过 AddConsumer() 创建的队列配置 PurgeOnStartup?

问题描述 投票:0回答:1

防止 XY 问题的一些背景: 我正在开发一个系统,其中每 X 分钟发送一条消息以在服务中的 Pod 上运行作业。当所有 pod 都不健康或重新启动时,这些消息可能会在队列中堆积。然后,当 Pod 重新启动时,它们开始立即消耗所有消息。这可能会导致问题,因为作业运行的频率不得超过每 X 分钟一次,这一点很重要。

此问题的解决方案似乎是 PurgeOnStartup 标志。当 Pod 重新启动时,它会清除队列,然后等待下一条作业消息的出现。我在将 PurgeOnStartup 添加到我们的配置中时遇到一些问题。

目前,我们将消费者添加到这样的配置中:

configurator.AddConsumer<SendTemplatedEmailMessage2Consumer>(c => c.UseConcurrentMessageLimit(1));

然后由中央 nuget 包中的代码使用它:

            configurator.UsingRabbitMq((context, cfg) =>
                {
                    if (settings.RabbitmqDelayedMessageExchangeEnabled)
                    {
                        cfg.UseConsumeFilter(typeof(HealthyConsumerFilter<>), context);
                    }
                    cfg.Host(settings.GetUri(), host =>
                    {
                        host.Username(settings.UserName);
                        host.Password(settings.Password);
                    });
                    cfg.ConfigureEndpoints(context);

第二部分是我们在所有服务中使用的设置,所以我不想更改它。我也只想将其添加到这个队列中。但是,AddConsumer() 配置器似乎不允许设置 PurgeOnStartup。我在过滤器和管道规格中有点迷失了,它似乎不是这样的。 是否可以在通过 AddConsumer() 创建的队列上配置 PurgeOnStartup? 如果没有,如何通过UsingRabbitMq 配置将其添加到单个消费者/队列?

c# rabbitmq masstransit
1个回答
0
投票

嗯,看来您想为通过 RabbitMQ 中的 AddConsumer 方法创建的某个队列设置 PurgeOnStartup 标志。不幸的是,AddConsumer 方法本身并不直接支持这样的 PurgeOnStartup 标志配置。

为此,您可能必须采取相反的方式。您可以尝试手动构建使用者并为其提供所需的配置 - 换句话说,让 PurgeOnStartup 自己标记 - 而不通过 AddConsumer 方法。

以下是您可以如何做到这一点:

cfg.ReceiveEndpoint("your_queue_name", endpoint =>
{
    endpoint.UseConcurrentMessageLimit(1); // Your existing configuration
    
    // Set PurgeOnStartup for this specific queue
    endpoint.PurgeOnStartup = true; // Set this flag to purge on startup

    endpoint.Consumer<SendTemplatedEmailMessage2Consumer>(context);
});

这样,您可以为特定队列创建使用者(

your_queue_name
应替换为您的实际队列名称)并为该队列设置 PurgeOnStartup 标志,同时保持其余配置不变。

© www.soinside.com 2019 - 2024. All rights reserved.