总线配置如下(使用 RabbitMq):
busConfigurator.AddEntityFrameworkOutbox<MyDbContext>(o =>
{
o.UsePostgres();
o.UseBusOutbox();
});
并且它在控制器中运行良好:
class MyController
{
private readonly MyDbContext _dbContext;
private readonly IPublishEndpoint _publishEndpoint;
public MyController(MyDbContext dbContext, IPublishEndpoint publishEndpoint)
{
_dbContext = dbContext;
_publishEndpoint = publishEndpoint;
}
public async Task Handle()
{
await _dbContext.Add(...);
await _publishEndpoint.Publish(new OtherMessage()); // <-- is being send through outbox after _dbContext.SaveChangesAsync() below
await _dbContext.SaveChangesAsync();
}
}
但是它不适用于消费者或任何依赖消费者的服务:
class MyConsumer : IConsumer<MyMessage>
{
private readonly MyDbContext _dbContext;
private readonly IPublishEndpoint _publishEndpoint;
public MyConsumer(MyDbContext dbContext, IPublishEndpoint publishEndpoint)
{
_dbContext = dbContext;
_publishEndpoint = publishEndpoint;
}
public async Task Consume(ConsumeContext<MyMessage> context)
{
await _publishEndpoint.Publish(new OtherMessage()); // is being sent directly to message broker before exception is thrown
throw new Exception("test");
}
}
据我所知(从文档)这是预期的行为,因为消费者发件箱未配置。如果我添加以下配置:
busConfigurator.AddConfigureEndpointsCallback((context, name, cfg) =>
{
cfg.UseEntityFrameworkOutbox<MyDbContext>(context);
});
然后它在消费者内部工作,抛出异常后不会发布消息,因为据我所知,它现在使用事务发件箱,并且“Consume”方法是用事务“包装”的。
但是现在(使用消费者发件箱)我也无法控制交易:
class MyConsumer : IConsumer<MyMessage>
{
private readonly MyDbContext _dbContext;
private readonly IPublishEndpoint _publishEndpoint;
public MyConsumer(MyDbContext dbContext, IPublishEndpoint publishEndpoint)
{
_dbContext = dbContext;
_publishEndpoint = publishEndpoint;
}
public async Task Consume(ConsumeContext<MyMessage> context)
{
await _publishEndpoint.Publish(new OtherMessage()); // is being sent through outbox even if I don't call _dbContext.SaveChangesAsync()
}
}
这很烦人。有没有办法在消费者内部实现 BusOutbox 行为?我只想在显式调用 _dbContext.SaveChangesAsync() 时以原子方式发送消息,并且不希望创建事务“包装”Consume 方法。
使用大众运输 8.2.0。
简短的回答,不,您必须使用文档中概述的消费者发件箱。您不能使用消费者中的“公交车”发件箱,MassTransit 有意控制交易以确保准确一次交付。