我将 MassTransit v8.0.8 库与 Azure 服务总线和 EF Core 结合使用。以下是我设置服务的方式。其中一个消费者运行的时间稍长一些,因为它处理大量记录(例如 50K)。在处理它们时,大约 5 分钟后,我的消费者似乎再次使用相同的消息(相同的相关 ID)。那么,我可以得到一些关于如何避免这种重复消费的建议吗?
services.AddMassTransit(busRegistrationConfigurator =>
{
busRegistrationConfigurator.SetKebabCaseEndpointNameFormatter();
busRegistrationConfigurator.UsingAzureServiceBus((registrationContext, busFactoryConfigurator) =>
{
busFactoryConfigurator.ConfigureEndpoints(registrationContext);
busFactoryConfigurator.Host(busConnectionString);
});
foreach (var implementation in consumerImplementations)
{
busRegistrationConfigurator.AddConsumer(implementation);
}
busRegistrationConfigurator.AddEntityFrameworkOutbox<TDbContext>(outboxConfigurator =>
{
outboxConfigurator.UseSqlServer();
outboxConfigurator.UseBusOutbox();
});
});
您需要设置
LockDuration
和 MaxAutoRenewDuration
的组合,以便消息在持续时间内保持锁定状态。
此外,如果您想与消费者一起使用交易发件箱,您还需要进行配置。
x.AddConfigureEndpointsCallback((context, name, cfg) =>
{
cfg.UseMessageRetry(r => r.Intervals(100, 500, 1000, 5000, 10000));
cfg.UseEntityFrameworkOutbox<TDbContext>(context);
if(cfg is IServiceBusReceiveEndpointConfigurator sb)
{
sb.LockDuration = TimeSpan.FromMinutes(5);
sb.MaxAutoRenewDuration = TimeSpan.FromMinutes(37);
});
});