我正在测试基于 JobConsumersSample 的 Masstransit 的甜蜜 JobConsumer 设置。当我使用 masstransit/rabbitmq 图像在本地运行时,一切正常,作业按照配置的并发限制运行。但是当我在 k8s 中针对真正的 RabbitMQ 安装进行尝试时,它将在第一分钟内处理 1 个作业,之后非常一致地处理 2 个/分钟。很明显问题出在我的 RabbitMQ 设置上,但我不知道从哪里开始。有什么建议吗?
RabbitMQ 版本为 3.9.16,带有 rabbitmq-delayed-message-exchange V3.9.0
我这样设置我的 ConsumerDefinition:
我不认为问题出在我的 Masstransit 设置上,但以防万一:
public class ExtractDocumentJobConsumerConfiguration : ConsumerDefinition<ExtractDocumentJobConsumer>
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<ExtractDocumentJobConsumer> consumerConfigurator)
{
consumerConfigurator.Options<JobOptions<ExtractDocument>>(options =>
options
.SetRetry(r => r.Interval(3, TimeSpan.FromSeconds(30)))
.SetJobTimeout(TimeSpan.FromMinutes(10))
.SetConcurrentJobLimit(20));
}
}
在我的 Program.cs 中
builder.Services
.AddMassTransit(
x =>
{
x.AddDelayedMessageScheduler();
x.AddConsumer<ExtractDocumentJobConsumer, ExtractDocumentJobConsumerConfiguration>();
//.Endpoint(e => e.Name = "extraction-job-queue");
x.AddConsumer<TrackAnalysisJobConsumer>();
x.AddSagaRepository<JobSaga>().InMemoryRepository();
x.AddSagaRepository<JobTypeSaga>().InMemoryRepository();
x.AddSagaRepository<JobAttemptSaga>().InMemoryRepository();
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq(
(context, cfg) =>
{
cfg.Host(
rabbitMQConfiguration.HostName,
rabbitMQConfiguration.VirtualHost,
h =>
{
h.Username(rabbitMQConfiguration.UserName);
h.Password(rabbitMQConfiguration.Password);
});
cfg.UseDelayedMessageScheduler();
var options = new ServiceInstanceOptions()
.SetEndpointNameFormatter(context.GetService<IEndpointNameFormatter>() ?? KebabCaseEndpointNameFormatter.Instance);
cfg.ServiceInstance(
options,
instance =>
{
instance.ConfigureJobServiceEndpoints(
js =>
{
js.SagaPartitionCount = 1;
js.FinalizeCompleted = true;
js.ConfigureSagaRepositories(context);
});
//instance.InstanceEndpointConfigurator.ConcurrentMessageLimit = 20;
instance.ConfigureEndpoints(
context,
f => f.Include<ExtractDocumentJobConsumer>());
});
// Configure the remaining consumers
cfg.ConfigureEndpoints(context);
});
});
builder.Services
.AddOptions<MassTransitHostOptions>()
.Configure(
options =>
{
options.WaitUntilStarted = true;
options.StartTimeout = TimeSpan.FromSeconds(45);
options.StopTimeout = TimeSpan.FromSeconds(45);
});
我不得不注释掉
.Enpoint(e => e.Name = "extraction-job-queue")
,否则它会给我一个错误,说端点已经存在。