从 EF Core 数据库拦截器发布公共交通的问题

问题描述 投票:0回答:1

我有一个大型项目,遵循干净的架构解决方案布局和一些领域驱动设计/CQRS 方法。我非常虔诚地关注 Milan Jovanovic 早期的干净建筑 Youtube 视频,如果这有帮助的话,我会来到这里。在集成 Mass Transit 之前,我使用了他的发件箱模式方法,该方法使用 EF 核心拦截器来拦截 SavingChangesAsync 方法,提取该事务期间域引发的事件并将它们保存到发件箱表中,如下所示:

public override ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData,
    InterceptionResult<int> result, CancellationToken cancellationToken = default)
{
    var context = eventData.Context;

    if (context is null)
        return base.SavingChangesAsync(eventData, result, cancellationToken);

    var events = context.ChangeTracker
        .Entries<AggregateRoot>()
        .Select(a => a.Entity)
        .Where(e => e.GetDomainEvents() is not null)
        .SelectMany(e =>
        {
            var domainEvents = e.GetDomainEvents();

            e.ClearDomainEvents();

            return domainEvents;
        });

    var messages = events
        .Where(e => e is not null)
        .Select(e => new OutboxMessage()
        {
            Id = Guid.NewGuid(),
            OccurredOnUtc = DateTime.UtcNow,
            Type = e.GetType().Name,
            Content = JsonConvert.SerializeObject(e, new JsonSerializerSettings
            {
                TypeNameHandling = TypeNameHandling.All
            })
        })
        .ToList();

    context.Set<OutboxMessage>().AddRange(messages);

    return base.SavingChangesAsync(eventData, result, cancellationToken);
}

这些条目稍后会被 Quartz 作业获取,该作业每 X 秒运行一次,并使用 MediatR 的通知发布事件。我不会详细介绍当前的方法,因为它绝对完美无缺。

我决定用 Mass Transit / RabbitMQ 取代这个伟大的方法,使其成为真正的 pub/sub,并消除 10 秒的延迟等问题。这是我注册公共交通的方法:

private static void ConfigureMessageQueue(IServiceCollection services, IConfiguration configuration)
{
    services.Configure<MessageBrokerSettings>(configuration.GetSection("MessageBroker"));

    services.AddSingleton(sp =>
        sp.GetRequiredService<IOptions<MessageBrokerSettings>>().Value);

    services.AddMassTransit(busConfigurator =>
    {
        busConfigurator.SetKebabCaseEndpointNameFormatter();

        busConfigurator.AddConsumersFromNamespaceContaining<ISomeApplicationMarker>();

        busConfigurator.UsingRabbitMq((context, configurator) =>
        {
            var settings = context.GetRequiredService<MessageBrokerSettings>();

            configurator.Host(new Uri(settings.Host), h =>
            {
                h.Username(settings.Username);
                h.Password(settings.Password);
            });

            configurator.ConfigureEndpoints(context);
        });
    });

    services.AddTransient<IEventBus, EventBus>();
}

事件总线只是 IBus 接口的抽象,以尽可能地将我的应用程序代码与 Mass Transit / RabbitMQ 解耦:

internal sealed class EventBus : IEventBus
{
    private readonly IBus _bus;

private readonly ILogger<EventBus> _logger;

public EventBus(IBus bus, ILogger<EventBus> logger)
{
    _bus = bus;
    _logger = logger;
}

public async Task PublishAsync<TMessage>(TMessage message, CancellationToken cancellationToken = default)
    where TMessage : class, IDomainEvent
{
    _logger.LogDebug("Publishing event {@Event}", message);

    await _bus.Publish(message, cancellationToken);
}
}

为了简洁起见,我不会向消费者展示它们,但它们只是正确事件类型的 IConsumer 实现。

我制作了一个新的 Ef Core 拦截器,它执行几乎相同的操作,只不过它使用事件总线将事件发布到 RabbitMQ,而不是将其保存在发件箱表中。

这就是奇怪的地方。

如果我直接从 MediatR 处理程序发布事件(因此完全跳过新的拦截器),则一切正常。消息发送到 RabbitMQ 并返回给我的消费者并执行我想要的操作。

如果我尝试使用我的拦截器,执行会在正确的时间进入它,我看到事件总线的日志语句表明事件已发布,我看到消息击中 RabbitMQ 但它永远不会触发消费者!据我所知,有关代理的所有内容都已正确配置,因为它在从 MediatR 处理程序直接调用 Publish 时起作用。我看到 Mass Transit 的日志记录表明,当应用程序启动时,端点已配置并连接到正确的消费者,RabbitMQ 正确显示交换和队列,一切都是一样的(显然,因为与调用时相比,没有任何变化) MediatR 处理程序)。

抱歉,这需要处理很多事情,但这对我来说确实毫无意义。如果有帮助的话,我很乐意提供更多代码。谢谢

c# entity-framework-core rabbitmq domain-driven-design masstransit
1个回答
0
投票

MassTransit 有自己的事务发件箱,应该使用它来代替用于 MediatR 的自定义实现。有一个可用示例和随附的视频。

© www.soinside.com 2019 - 2024. All rights reserved.