MassTransit (C#) 中跨端点的并发限制

问题描述 投票:0回答:2

公共交通3.5, 兔子Mq

我有一个总线连接服务,可以监听队列数量。

  • Q1
  • Q2
  • Q3
  • 第四季度 等等

所有队列都接受相同的消息类型,并且所有队列都使用竞争消费者模式,其中不同机器上有多个服务侦听这些相同的队列。

我所追求的是为每个进程的所有这些队列设置并发限制。

我的资源(可用许可证数量)有限,并且需要将并发请求限制为每台计算机的可用许可证数量。

所以机器 A 可能有 4 个,机器 B 可能有 10 个,等等。

如果我已经有 4 个消费者处理所有队列中的消息,我不希望机器 A 消费所有这些队列中的任何消息,它应该让其他人(机器 B、C 等)在有可用资源的情况下消费它。

我的问题是使用

    sbc.UseConcurrencyLimit(4)

里面

    var bus = MassTransit.Bus.Factory.CreateUsingRabbitMq

设置每个队列的并发限制,所以如果我有 4 个队列,它们就会全部相加。

我需要的是所有队列累计达到并发限制,但不超过它。

MassTransit 中有内置方法可以实现此目标吗?

c# concurrency rabbitmq masstransit
2个回答
0
投票

没有内置方法来限制多个服务之间的并发。这需要使用某种类型的全局资源管理器,而 MassTransit 并不支持开箱即用。


0
投票

我正在为新版本的 MassTransit 添加此答案。目前,版本 8,我可以累计限制并发数。

有两个地方可以设置并发限制。

其中一个是定义 ReceiveEndpoint 时。这将设置每个队列的最大并发数。

cfg.ReceiveEndpoint("Q_Import_Import", e =>
{
    e.ConfigureConsumer<TrafficImportConsumer>(context);
    e.UseConcurrencyLimit(1);
});

第二个地方是当您添加 MassTransit 服务时。通过设置

UseConcurrencyLimit
可以累计设置最大并发数

builder.Services.AddMassTransit(x => 
x.UsingInMemory((context, cfg) =>
{
    cfg.UseConcurrencyLimit(4);
    cfg.ConfigureEndpoints(context);
});
© www.soinside.com 2019 - 2024. All rights reserved.