MassTransit-PrefetchCount和单个消费者的多个渠道的说明

问题描述 投票:0回答:1

我一直在玩PreFetch,并尝试弄清为什么在队列的管理界面上PreFetch始终设置为0。在RabbitMQ管理界面中,我可以在通道上看到已配置的预取,但看不到队列本身。我还注意到他们被注册为“全球”而不是“每个消费者”,但是就我的一生而言,我似乎找不到在MassTransit中进行更改的设置,尽管我猜我有误解的工作方式,文档并没有帮助我取得ELI5。

这是示例配置:

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
   var host = cfg.Host(
        new Uri(busSettings.HostAddress),
        h =>
        {
            h.Username(busSettings.Username);
            h.Password(busSettings.Password);
        });

    cfg.ReceiveEndpoint(
        host,
        "TEST-QUEUE-PF",
        ec =>
        {
            ec.Consumer<MyConsumer>(context);
            ec.PrefetchCount = 50; // consumer specific
            ec.UseConcurrencyLimit(1); // consumer specific
        });

    cfg.PrefetchCount = 100; // bus control specific
    cfg.UseConcurrencyLimit(1); // bus control specific
});

这将创建以下队列:

Queue

然后查看频道,我会看到有关预取的以下信息:

enter image description here

如果我查看所有频道,就会看到以下内容:

enter image description here

我正在努力了解这些PrefetchCounts分别与什么相关。

[作为背景,我们有几台运行消费者的多核服务器(即轮循,或更合适的是“饥饿的河马”,因为我不在乎平均分配)。 PrefetchCount和ConcurrencyLimit的默认设置不能很好地为我们服务,因为我们的使用者有很多工作要做,并且数据库服务器超载导致超时。我正在寻找一种配置这些使用者的方法,以使他们不这样做。

这是MassTransit 5.5.5,因为任何超出此范围的操作都会破坏UseSerilog()集成,因此我找不到简单的升级路径。 Erlang和RabbitMq本身是当前版本。这是更详细的AutoFac模块:

private class BusModule : Module
{
    protected override void Load(ContainerBuilder builder)
    {
        builder.RegisterAssemblyTypes(GetType().Assembly).As<IConsumer>();
        builder.Register(context =>
        {
            var busSettings = context.Resolve<BusSettings>();
            var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(
                    new Uri(busSettings.HostAddress),
                    h =>
                    {
                        h.Username(busSettings.Username);
                        h.Password(busSettings.Password);
                    });

                cfg.ReceiveEndpoint(
                    host,
                    $"TEST-QUEUE-GLOBAL", // shared queue name for all consumers
                    ec =>
                    {
                        ec.PrefetchCount = 50;
                        ec.UseConcurrencyLimit(2);
                        ec.Consumer<MyConsumer>(context);
                        ec.EnablePriority(5);
                        ec.UseRetry(retryConfig =>
                        {
                            retryConfig
                                .Intervals(new[] { 1, 2, 4, 8, 16, 32 }
                                .Select(t => TimeSpan.FromMinutes(t))
                                .ToArray());
                            retryConfig
                                .Handle<HttpRequestException>();
                            retryConfig
                                .Handle<SwaggerException>(ex => ex.IsRetryValid());
                        });
                    });

                cfg.PrefetchCount = 100;
                cfg.UseConcurrencyLimit(2);
                cfg.UseSerilog();

                var correlationIdProvider = context.Resolve<ICorrelationProvider>();
                cfg.ConfigurePublish(x => x.UseExecute(sendContext =>
                {
                    sendContext.CorrelationId = 
                        sendContext.CorrelationId == Guid.Empty ? 
                            correlationIdProvider.GetId() : sendContext.CorrelationId; // cascade
                }));
            });

            return busControl;
        })
        .SingleInstance()
        .As<IBusControl>()
        .As<IBus>();
    }
}
c# rabbitmq masstransit
1个回答
1
投票

[首先,我假设您使用的是MassTransit的较旧版本,因为切换是从v6开始,不再使用全局预取。

第二,高的预取计数与并发限制1结合将导致(prefetchcount-1)消息位于接收端点上,等待同时处理1条消息。因此,如果只有50条消息,则第一个节点可能会全部接收它们,然后您的其他节点处于空闲状态,因为消息正在等待单个节点上的瓶颈。

RabbitMQ管理控制台的当前版本,带有频道预取,如下所示:

RabbitMQ Prefetch Count

由于MassTransit仅在频道上放置了一个使用者,因此以前的方法实质上将使用者限制在全局频道预取中,但是现在更加明确了。另外,新设置适用于仲裁队列,该队列不支持全局预取设置。

如果您正在超载数据库,并且已经优化了数据库查询以避免锁定/阻塞,并且需要减少流量,请降低预取,使其接近并发限制的140%。因此,认真地讲,如果您的数字为1,则将预取设置为2。

© www.soinside.com 2019 - 2024. All rights reserved.