我正在发布一条消息来开始工作。我有一个消费者设置,据我所知,我已将其配置为一次处理 1 个作业。所有其他作业都应保留在队列中,直到该作业完成。问题是,一旦 1 个作业开始,并且我发布了更多作业,它们就会被发送到 job_error 队列,并显示消息
The JobSubmitted event is not handled during the Started state for the JobStateMachine state machine
。
我尝试了多种组合来使其发挥作用,但我似乎无法弄清楚。是我设置不当吗?
程序配置
builder.Services.AddMassTransit(x =>
{
x.AddDelayedMessageScheduler();
x.SetEndpointNameFormatter(new SnakeCaseEndpointNameFormatter(includeNamespace: true));
x.AddConsumer<OnArchiveTimedRequestsToCheaperStorage, OnArchiveTimedRequestsToCheaperStorageDefinition>();
//x.AddConsumers(typeof(Program).Assembly);
x.SetJobConsumerOptions();
x.SetInMemorySagaRepositoryProvider();
x.AddJobSagaStateMachines();
x.UsingAzureServiceBus((context, cfg) =>
{
x.AddDelayedMessageScheduler();
cfg.Host(builder.Configuration["AzureServiceBusConfiguration:ConnectionString"]);
cfg.SetNamespaceSeparatorTo("_");
cfg.ConfigureEndpoints(context);
cfg.UseServiceBusMessageScheduler();
});
});
消费者
public class OnArchiveTimedRequestsToCheaperStorage : IJobConsumer<ArchiveTimedRequestsToCheaperStorage>
{
public async Task Run(JobContext<ArchiveTimedRequestsToCheaperStorage> context)
{
bool run = true;
while (run)
{
await Task.Delay(2000);
}
}
}
public class OnArchiveTimedRequestsToCheaperStorageDefinition : ConsumerDefinition<OnArchiveTimedRequestsToCheaperStorage>
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator
, IConsumerConfigurator<OnArchiveTimedRequestsToCheaperStorage> consumerConfigurator
, IRegistrationContext context)
{
consumerConfigurator.Options<JobOptions<ArchiveTimedRequestsToCheaperStorage>>(options => options
.SetConcurrentJobLimit(1));
}
}
您使用相同的
JobId
提交作业,或者如果您不使用 SubmitJob<T>
,则指定相同的 RequestId
属性,然后将其用于 JobId
。
每个作业都必须有一个唯一的标识符。