如何在消费者范围内使用BusOutbox?

问题描述 投票:0回答:1

总线配置如下(使用 RabbitMq):

busConfigurator.AddEntityFrameworkOutbox<MyDbContext>(o =>
{
    o.UsePostgres();
    o.UseBusOutbox();
});

并且它在控制器中运行良好:

class MyController
{
    private readonly MyDbContext _dbContext;

    private readonly IPublishEndpoint _publishEndpoint;

    public MyController(MyDbContext dbContext, IPublishEndpoint publishEndpoint)
    {
        _dbContext = dbContext;
        _publishEndpoint = publishEndpoint;
    }

    public async Task Handle()
    {
        await _dbContext.Add(...);
        await _publishEndpoint.Publish(new OtherMessage()); // <-- is being send through outbox after _dbContext.SaveChangesAsync() below
        await _dbContext.SaveChangesAsync();
    }
}

但是它不适用于消费者或任何依赖消费者的服务:

class MyConsumer : IConsumer<MyMessage>
{
    private readonly MyDbContext _dbContext;

    private readonly IPublishEndpoint _publishEndpoint;

    public MyConsumer(MyDbContext dbContext, IPublishEndpoint publishEndpoint)
    {
        _dbContext = dbContext;
        _publishEndpoint = publishEndpoint;
    }

    public async Task Consume(ConsumeContext<MyMessage> context)
    {
        await _publishEndpoint.Publish(new OtherMessage()); // is being sent directly to message broker before exception is thrown
        throw new Exception("test");
    }
}

据我所知(从文档)这是预期的行为,因为消费者发件箱未配置。如果我添加以下配置:

busConfigurator.AddConfigureEndpointsCallback((context, name, cfg) =>
{
    cfg.UseEntityFrameworkOutbox<MyDbContext>(context);
});

然后它在消费者内部工作,抛出异常后不会发布消息,因为据我所知,它现在使用事务发件箱,并且“Consume”方法是用事务“包装”的。

但是现在(使用消费者发件箱)我也无法控制交易:

class MyConsumer : IConsumer<MyMessage>
{
    private readonly MyDbContext _dbContext;

    private readonly IPublishEndpoint _publishEndpoint;

    public MyConsumer(MyDbContext dbContext, IPublishEndpoint publishEndpoint)
    {
        _dbContext = dbContext;
        _publishEndpoint = publishEndpoint;
    }

    public async Task Consume(ConsumeContext<MyMessage> context)
    {
        await _publishEndpoint.Publish(new OtherMessage()); // is being sent through outbox even if I don't call _dbContext.SaveChangesAsync()
    }
}

这很烦人。有没有办法在消费者内部实现 BusOutbox 行为?我只想在显式调用 _dbContext.SaveChangesAsync() 时以原子方式发送消息,并且不希望创建事务“包装”Consume 方法。

使用大众运输 8.2.0。

masstransit
1个回答
0
投票

简短的回答,不,您必须使用文档中概述的消费者发件箱。您不能使用消费者中的“公交车”发件箱,MassTransit 有意控制交易以确保准确一次交付。

© www.soinside.com 2019 - 2024. All rights reserved.