MassTransit - 如何设置作业消费者一次处理 1 个作业?

问题描述 投票:0回答:1

我正在发布一条消息来开始工作。我有一个消费者设置,据我所知,我已将其配置为一次处理 1 个作业。所有其他作业都应保留在队列中,直到该作业完成。问题是,一旦 1 个作业开始,并且我发布了更多作业,它们就会被发送到 job_error 队列,并显示消息

The JobSubmitted event is not handled during the Started state for the JobStateMachine state machine

我尝试了多种组合来使其发挥作用,但我似乎无法弄清楚。是我设置不当吗?

程序配置

 builder.Services.AddMassTransit(x =>
 {
     x.AddDelayedMessageScheduler();
     x.SetEndpointNameFormatter(new SnakeCaseEndpointNameFormatter(includeNamespace: true));

     x.AddConsumer<OnArchiveTimedRequestsToCheaperStorage, OnArchiveTimedRequestsToCheaperStorageDefinition>();

     //x.AddConsumers(typeof(Program).Assembly);

     x.SetJobConsumerOptions();
     x.SetInMemorySagaRepositoryProvider();
     x.AddJobSagaStateMachines();

     x.UsingAzureServiceBus((context, cfg) =>
     {
         x.AddDelayedMessageScheduler();
         cfg.Host(builder.Configuration["AzureServiceBusConfiguration:ConnectionString"]);
         cfg.SetNamespaceSeparatorTo("_");

         cfg.ConfigureEndpoints(context);
         cfg.UseServiceBusMessageScheduler();
     });
 });

消费者

public class OnArchiveTimedRequestsToCheaperStorage : IJobConsumer<ArchiveTimedRequestsToCheaperStorage>
{
    public async Task Run(JobContext<ArchiveTimedRequestsToCheaperStorage> context)
    {
        bool run = true;
        while (run)
        {
            await Task.Delay(2000);
        }
    }
}


public class OnArchiveTimedRequestsToCheaperStorageDefinition : ConsumerDefinition<OnArchiveTimedRequestsToCheaperStorage>
{
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator
        , IConsumerConfigurator<OnArchiveTimedRequestsToCheaperStorage> consumerConfigurator
        , IRegistrationContext context)
    {
        consumerConfigurator.Options<JobOptions<ArchiveTimedRequestsToCheaperStorage>>(options => options
            .SetConcurrentJobLimit(1));
    }
}

蔚蓝SB

c# masstransit
1个回答
0
投票

您使用相同的

JobId
提交作业,或者如果您不使用
SubmitJob<T>
,则指定相同的
RequestId
属性,然后将其用于
JobId

每个作业都必须有一个唯一的标识符。

© www.soinside.com 2019 - 2024. All rights reserved.