大众交通交易发件箱不使用消费者上下文中的发件箱

问题描述 投票:0回答:1

在以下场景中,API 使用 SqlServer 事务发件箱将消息发布到队列。

另一个组件使用该消息并使用 SqlServer 事务发件箱向第三个组件触发另一条消息。

API中的流程执行正确,并利用了发件箱,将实体和消息保存为事务,同时可以看到PublishEndpointProvider具有发件箱中间件。

Outbox in API

当我尝试在公共交通消费者中执行相同的发件箱配置时,就会出现问题。 (分享消费者代码)

public class ConsumeOrderPublished : IConsumer<OrderPublished>
{
    private readonly IPublishEndpoint _publishEndpoint;
    private readonly IPaymentRepository _paymentRepository;
    private readonly IUnitOfWork _unitOfWork;

    public ConsumeOrderPublished(IPublishEndpoint publishEndpoint,
        IPaymentRepository paymentRepository, IUnitOfWork unitOfWork)
    {
        _publishEndpoint = publishEndpoint;
        _paymentRepository = paymentRepository;
        _unitOfWork = unitOfWork;
    }

    public async Task Consume(ConsumeContext<OrderPublished> context)
    {
        var orderPublished = context.Message;
        var payment = new Payment
        {
            Id = Guid.NewGuid(),
            Amount = 20,
        };

        await _unitOfWork.BeginTransactionAsync(context.CancellationToken);
        await _paymentRepository.AddAsync(payment, context.CancellationToken);
        await _publishEndpoint.Publish<OrderProcessed>(new
        {
            OrderId = orderPublished.Id,
            PaymentId = payment.Id,
        });

        await _unitOfWork.CommitTransactionAsync();
    }
}

上下文和注入的 IPublishEndpoint 都没有发件箱中间件配置 Consumer context in worker Consumer IPublishEndpoint in worker

当我删除 SqlServer 发件箱并使用内存中发件箱时,行为有所不同,内存中发件箱正确配置了中间件 Consumer with in memory outbox

这是worker组件中的Masstransit配置,在这个repo中也有源代码。我正在使用 Masstransit v8.1.1

public static class MasstransitWorkerExtensions
{
    public static IServiceCollection AddMasstransitWorkerConfiguration(this IServiceCollection services,
        IConfiguration configuration)
    {
        var busOptions = configuration.GetSection(BrokerOptions.SectionName).Get<BrokerOptions>();

        services.AddMassTransit(x => {
            x.AddEntityFrameworkOutbox<OrderDbContext>(o =>
            {
                o.QueryDelay = TimeSpan.FromSeconds(1);
                o.UseSqlServer();
                o.UseBusOutbox(c => c.DisableDeliveryService());
            });

            x.SetKebabCaseEndpointNameFormatter();

            x.AddConsumer<ConsumeOrderPublished>();
            x.AddConsumer<ConsumeOrderProcessed>();

            x.UsingRabbitMq((ctx, cfg) =>
            {
                cfg.Host(busOptions.ConnectionString);
                cfg.ConfigureEndpoints(ctx);
                // cfg.UseInMemoryOutbox(); 
                // when activate this line and remove lines 16 - 21 the outbox works fine
            });
        });

        return services;
    }
}

我想在公共交通消费者中使用事务发件箱模式。我发现这个post说这个可能的问题已经解决了。

c# sql-server api masstransit worker
1个回答
0
投票

您没有在任何消费者上配置它,因此它不会使用它。必须配置事务发件箱才能正常工作。

您可以通过添加回调来配置所有接收端点上的发件箱:

x.AddConfigureEndpointsCallback((context, name, cfg) => 
{
    cfg.UseMessageRetry(r => r.Intervals(100, 500, 1000, 5000, 10000));
    cfg.UseEntityFrameworkOutbox<OrderDbContext>(context);
});
© www.soinside.com 2019 - 2024. All rights reserved.