Masstransit 作业服务消费者并发限制为每分钟 2 个请求

问题描述 投票:0回答:0

我正在测试基于 JobConsumersSample 的 Masstransit 的甜蜜 JobConsumer 设置。当我使用 masstransit/rabbitmq 图像在本地运行时,一切正常,作业按照配置的并发限制运行。但是当我在 k8s 中针对真正的 RabbitMQ 安装进行尝试时,它将在第一分钟内处理 1 个作业,之后非常一致地处理 2 个/分钟。很明显问题出在我的 RabbitMQ 设置上,但我不知道从哪里开始。有什么建议吗?

RabbitMQ 版本为 3.9.16,带有 rabbitmq-delayed-message-exchange V3.9.0

我这样设置我的 ConsumerDefinition:

我不认为问题出在我的 Masstransit 设置上,但以防万一:

   public class ExtractDocumentJobConsumerConfiguration : ConsumerDefinition<ExtractDocumentJobConsumer>
    {
        protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
      IConsumerConfigurator<ExtractDocumentJobConsumer> consumerConfigurator)
        {
            consumerConfigurator.Options<JobOptions<ExtractDocument>>(options =>
                options
                    .SetRetry(r => r.Interval(3, TimeSpan.FromSeconds(30)))
                    .SetJobTimeout(TimeSpan.FromMinutes(10))
                    .SetConcurrentJobLimit(20));
        }
    }

在我的 Program.cs 中

 builder.Services
                .AddMassTransit(
                    x =>
                    {
                        x.AddDelayedMessageScheduler();

                        x.AddConsumer<ExtractDocumentJobConsumer, ExtractDocumentJobConsumerConfiguration>();
                        //.Endpoint(e => e.Name = "extraction-job-queue");

                        x.AddConsumer<TrackAnalysisJobConsumer>();

                        x.AddSagaRepository<JobSaga>().InMemoryRepository();
                        x.AddSagaRepository<JobTypeSaga>().InMemoryRepository();
                        x.AddSagaRepository<JobAttemptSaga>().InMemoryRepository();

                        x.SetKebabCaseEndpointNameFormatter();

                        x.UsingRabbitMq(
                            (context, cfg) =>
                            {
                                cfg.Host(
                                    rabbitMQConfiguration.HostName,
                                    rabbitMQConfiguration.VirtualHost,
                                    h =>
                                    {
                                        h.Username(rabbitMQConfiguration.UserName);
                                        h.Password(rabbitMQConfiguration.Password);
                                    });

                                cfg.UseDelayedMessageScheduler();

                                var options = new ServiceInstanceOptions()
                                    .SetEndpointNameFormatter(context.GetService<IEndpointNameFormatter>() ?? KebabCaseEndpointNameFormatter.Instance);

                                cfg.ServiceInstance(
                                    options,
                                    instance =>
                                    {
                                        instance.ConfigureJobServiceEndpoints(
                                            js =>
                                            {
                                                js.SagaPartitionCount = 1;
                                                js.FinalizeCompleted = true;
                                                js.ConfigureSagaRepositories(context);
                                            });
                                        //instance.InstanceEndpointConfigurator.ConcurrentMessageLimit = 20;
                                        instance.ConfigureEndpoints(
                                            context,
                                            f => f.Include<ExtractDocumentJobConsumer>());
                                    });

                                // Configure the remaining consumers
                                cfg.ConfigureEndpoints(context);
                            });
                    });

            builder.Services
              .AddOptions<MassTransitHostOptions>()
              .Configure(
                  options =>
                  {
                      options.WaitUntilStarted = true;
                      options.StartTimeout = TimeSpan.FromSeconds(45);
                      options.StopTimeout = TimeSpan.FromSeconds(45);
                  });

我不得不注释掉

.Enpoint(e => e.Name = "extraction-job-queue")
,否则它会给我一个错误,说端点已经存在。

masstransit
© www.soinside.com 2019 - 2024. All rights reserved.