我一直在玩PreFetch,并尝试弄清为什么在队列的管理界面上PreFetch始终设置为0。在RabbitMQ管理界面中,我可以在通道上看到已配置的预取,但看不到队列本身。我还注意到他们被注册为“全球”而不是“每个消费者”,但是就我的一生而言,我似乎找不到在MassTransit中进行更改的设置,尽管我猜我有误解的工作方式,文档并没有帮助我取得ELI5。
这是示例配置:
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(
new Uri(busSettings.HostAddress),
h =>
{
h.Username(busSettings.Username);
h.Password(busSettings.Password);
});
cfg.ReceiveEndpoint(
host,
"TEST-QUEUE-PF",
ec =>
{
ec.Consumer<MyConsumer>(context);
ec.PrefetchCount = 50; // consumer specific
ec.UseConcurrencyLimit(1); // consumer specific
});
cfg.PrefetchCount = 100; // bus control specific
cfg.UseConcurrencyLimit(1); // bus control specific
});
这将创建以下队列:
然后查看频道,我会看到有关预取的以下信息:
如果我查看所有频道,就会看到以下内容:
我正在努力了解这些PrefetchCounts分别与什么相关。
[作为背景,我们有几台运行消费者的多核服务器(即轮循,或更合适的是“饥饿的河马”,因为我不在乎平均分配)。 PrefetchCount和ConcurrencyLimit的默认设置不能很好地为我们服务,因为我们的使用者有很多工作要做,并且数据库服务器超载导致超时。我正在寻找一种配置这些使用者的方法,以使他们不这样做。
这是MassTransit 5.5.5,因为任何超出此范围的操作都会破坏UseSerilog()集成,因此我找不到简单的升级路径。 Erlang和RabbitMq本身是当前版本。这是更详细的AutoFac模块:
private class BusModule : Module
{
protected override void Load(ContainerBuilder builder)
{
builder.RegisterAssemblyTypes(GetType().Assembly).As<IConsumer>();
builder.Register(context =>
{
var busSettings = context.Resolve<BusSettings>();
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(
new Uri(busSettings.HostAddress),
h =>
{
h.Username(busSettings.Username);
h.Password(busSettings.Password);
});
cfg.ReceiveEndpoint(
host,
$"TEST-QUEUE-GLOBAL", // shared queue name for all consumers
ec =>
{
ec.PrefetchCount = 50;
ec.UseConcurrencyLimit(2);
ec.Consumer<MyConsumer>(context);
ec.EnablePriority(5);
ec.UseRetry(retryConfig =>
{
retryConfig
.Intervals(new[] { 1, 2, 4, 8, 16, 32 }
.Select(t => TimeSpan.FromMinutes(t))
.ToArray());
retryConfig
.Handle<HttpRequestException>();
retryConfig
.Handle<SwaggerException>(ex => ex.IsRetryValid());
});
});
cfg.PrefetchCount = 100;
cfg.UseConcurrencyLimit(2);
cfg.UseSerilog();
var correlationIdProvider = context.Resolve<ICorrelationProvider>();
cfg.ConfigurePublish(x => x.UseExecute(sendContext =>
{
sendContext.CorrelationId =
sendContext.CorrelationId == Guid.Empty ?
correlationIdProvider.GetId() : sendContext.CorrelationId; // cascade
}));
});
return busControl;
})
.SingleInstance()
.As<IBusControl>()
.As<IBus>();
}
}
[首先,我假设您使用的是MassTransit的较旧版本,因为切换是从v6开始,不再使用全局预取。
第二,高的预取计数与并发限制1结合将导致(prefetchcount-1)消息位于接收端点上,等待同时处理1条消息。因此,如果只有50条消息,则第一个节点可能会全部接收它们,然后您的其他节点处于空闲状态,因为消息正在等待单个节点上的瓶颈。
RabbitMQ管理控制台的当前版本,带有频道预取,如下所示:
由于MassTransit仅在频道上放置了一个使用者,因此以前的方法实质上将使用者限制在全局频道预取中,但是现在更加明确了。另外,新设置适用于仲裁队列,该队列不支持全局预取设置。
如果您正在超载数据库,并且已经优化了数据库查询以避免锁定/阻塞,并且需要减少流量,请降低预取,使其接近并发限制的140%。因此,认真地讲,如果您的数字为1,则将预取设置为2。