我有一个大型项目,遵循干净的架构解决方案布局和一些领域驱动设计/CQRS 方法。我非常虔诚地关注 Milan Jovanovic 早期的干净建筑 Youtube 视频,如果这有帮助的话,我会来到这里。在集成 Mass Transit 之前,我使用了他的发件箱模式方法,该方法使用 EF 核心拦截器来拦截 SavingChangesAsync 方法,提取该事务期间域引发的事件并将它们保存到发件箱表中,如下所示:
public override ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData,
InterceptionResult<int> result, CancellationToken cancellationToken = default)
{
var context = eventData.Context;
if (context is null)
return base.SavingChangesAsync(eventData, result, cancellationToken);
var events = context.ChangeTracker
.Entries<AggregateRoot>()
.Select(a => a.Entity)
.Where(e => e.GetDomainEvents() is not null)
.SelectMany(e =>
{
var domainEvents = e.GetDomainEvents();
e.ClearDomainEvents();
return domainEvents;
});
var messages = events
.Where(e => e is not null)
.Select(e => new OutboxMessage()
{
Id = Guid.NewGuid(),
OccurredOnUtc = DateTime.UtcNow,
Type = e.GetType().Name,
Content = JsonConvert.SerializeObject(e, new JsonSerializerSettings
{
TypeNameHandling = TypeNameHandling.All
})
})
.ToList();
context.Set<OutboxMessage>().AddRange(messages);
return base.SavingChangesAsync(eventData, result, cancellationToken);
}
这些条目稍后会被 Quartz 作业获取,该作业每 X 秒运行一次,并使用 MediatR 的通知发布事件。我不会详细介绍当前的方法,因为它绝对完美无缺。
我决定用 Mass Transit / RabbitMQ 取代这个伟大的方法,使其成为真正的 pub/sub,并消除 10 秒的延迟等问题。这是我注册公共交通的方法:
private static void ConfigureMessageQueue(IServiceCollection services, IConfiguration configuration)
{
services.Configure<MessageBrokerSettings>(configuration.GetSection("MessageBroker"));
services.AddSingleton(sp =>
sp.GetRequiredService<IOptions<MessageBrokerSettings>>().Value);
services.AddMassTransit(busConfigurator =>
{
busConfigurator.SetKebabCaseEndpointNameFormatter();
busConfigurator.AddConsumersFromNamespaceContaining<ISomeApplicationMarker>();
busConfigurator.UsingRabbitMq((context, configurator) =>
{
var settings = context.GetRequiredService<MessageBrokerSettings>();
configurator.Host(new Uri(settings.Host), h =>
{
h.Username(settings.Username);
h.Password(settings.Password);
});
configurator.ConfigureEndpoints(context);
});
});
services.AddTransient<IEventBus, EventBus>();
}
事件总线只是 IBus 接口的抽象,以尽可能地将我的应用程序代码与 Mass Transit / RabbitMQ 解耦:
internal sealed class EventBus : IEventBus
{
private readonly IBus _bus;
private readonly ILogger<EventBus> _logger;
public EventBus(IBus bus, ILogger<EventBus> logger)
{
_bus = bus;
_logger = logger;
}
public async Task PublishAsync<TMessage>(TMessage message, CancellationToken cancellationToken = default)
where TMessage : class, IDomainEvent
{
_logger.LogDebug("Publishing event {@Event}", message);
await _bus.Publish(message, cancellationToken);
}
}
为了简洁起见,我不会向消费者展示它们,但它们只是正确事件类型的 IConsumer 实现。
我制作了一个新的 Ef Core 拦截器,它执行几乎相同的操作,只不过它使用事件总线将事件发布到 RabbitMQ,而不是将其保存在发件箱表中。
这就是奇怪的地方。
如果我直接从 MediatR 处理程序发布事件(因此完全跳过新的拦截器),则一切正常。消息发送到 RabbitMQ 并返回给我的消费者并执行我想要的操作。
如果我尝试使用我的拦截器,执行会在正确的时间进入它,我看到事件总线的日志语句表明事件已发布,我看到消息击中 RabbitMQ 但它永远不会触发消费者!据我所知,有关代理的所有内容都已正确配置,因为它在从 MediatR 处理程序直接调用 Publish 时起作用。我看到 Mass Transit 的日志记录表明,当应用程序启动时,端点已配置并连接到正确的消费者,RabbitMQ 正确显示交换和队列,一切都是一样的(显然,因为与调用时相比,没有任何变化) MediatR 处理程序)。
抱歉,这需要处理很多事情,但这对我来说确实毫无意义。如果有帮助的话,我很乐意提供更多代码。谢谢
MassTransit 有自己的事务发件箱,应该使用它来代替用于 MediatR 的自定义实现。有一个可用示例和随附的视频。