在以下场景中,API 使用 SqlServer 事务发件箱将消息发布到队列。
另一个组件使用该消息并使用 SqlServer 事务发件箱向第三个组件触发另一条消息。
API中的流程执行正确,并利用了发件箱,将实体和消息保存为事务,同时可以看到PublishEndpointProvider具有发件箱中间件。
当我尝试在公共交通消费者中执行相同的发件箱配置时,就会出现问题。 (分享消费者代码)
public class ConsumeOrderPublished : IConsumer<OrderPublished>
{
private readonly IPublishEndpoint _publishEndpoint;
private readonly IPaymentRepository _paymentRepository;
private readonly IUnitOfWork _unitOfWork;
public ConsumeOrderPublished(IPublishEndpoint publishEndpoint,
IPaymentRepository paymentRepository, IUnitOfWork unitOfWork)
{
_publishEndpoint = publishEndpoint;
_paymentRepository = paymentRepository;
_unitOfWork = unitOfWork;
}
public async Task Consume(ConsumeContext<OrderPublished> context)
{
var orderPublished = context.Message;
var payment = new Payment
{
Id = Guid.NewGuid(),
Amount = 20,
};
await _unitOfWork.BeginTransactionAsync(context.CancellationToken);
await _paymentRepository.AddAsync(payment, context.CancellationToken);
await _publishEndpoint.Publish<OrderProcessed>(new
{
OrderId = orderPublished.Id,
PaymentId = payment.Id,
});
await _unitOfWork.CommitTransactionAsync();
}
}
上下文和注入的 IPublishEndpoint 都没有发件箱中间件配置
当我删除 SqlServer 发件箱并使用内存中发件箱时,行为有所不同,内存中发件箱正确配置了中间件
这是worker组件中的Masstransit配置,在这个repo中也有源代码。我正在使用 Masstransit v8.1.1
public static class MasstransitWorkerExtensions
{
public static IServiceCollection AddMasstransitWorkerConfiguration(this IServiceCollection services,
IConfiguration configuration)
{
var busOptions = configuration.GetSection(BrokerOptions.SectionName).Get<BrokerOptions>();
services.AddMassTransit(x => {
x.AddEntityFrameworkOutbox<OrderDbContext>(o =>
{
o.QueryDelay = TimeSpan.FromSeconds(1);
o.UseSqlServer();
o.UseBusOutbox(c => c.DisableDeliveryService());
});
x.SetKebabCaseEndpointNameFormatter();
x.AddConsumer<ConsumeOrderPublished>();
x.AddConsumer<ConsumeOrderProcessed>();
x.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host(busOptions.ConnectionString);
cfg.ConfigureEndpoints(ctx);
// cfg.UseInMemoryOutbox();
// when activate this line and remove lines 16 - 21 the outbox works fine
});
});
return services;
}
}
我想在公共交通消费者中使用事务发件箱模式。我发现这个post说这个可能的问题已经解决了。
您没有在任何消费者上配置它,因此它不会使用它。必须配置事务发件箱才能正常工作。
您可以通过添加回调来配置所有接收端点上的发件箱:
x.AddConfigureEndpointsCallback((context, name, cfg) =>
{
cfg.UseMessageRetry(r => r.Intervals(100, 500, 1000, 5000, 10000));
cfg.UseEntityFrameworkOutbox<OrderDbContext>(context);
});