在同一进程中使用 MassTransit 运行多个 Web 应用程序时出现奇怪的事务发件箱行为

问题描述 投票:0回答:1

我有多个 Web 应用程序,每个应用程序都有自己单独的 DI 容器,在同一进程中运行(侦听不同端口)。每个应用程序将 MassTransit(使用 RabbitMq)添加到具有事务发件箱的 DI,并且每个应用程序都有自己的 DbContext(尽管所有应用程序都使用相同的物理 PG 数据库,只是不同的 DB 模式)。

以下是我如何通过发件箱注册 MT:

services.AddMassTransit(busConfigurator =>
{
    busConfigurator.AddConsumers(typeof(UpdateSubscriptionStatusOnSubscriptionStatusPublished).Assembly);

    busConfigurator.UsingRabbitMq((context, rabbitCfg) =>
    {
        rabbitCfg.ConfigureJsonSerializerOptions(jsonSerializerOptions =>
        {
            jsonSerializerOptions.ConfigureForNodaTime(new NodaJsonSettings());

            return jsonSerializerOptions;
        });

        rabbitCfg.Host(connectionUri);
        rabbitCfg.ConfigureEndpoints(context);
    });
    
    busConfigurator.AddEntityFrameworkOutbox<DatabaseContext>(o =>
    {
        o.UsePostgres();
        o.UseBusOutbox();
    });
});

这是我如何将发件箱实体添加到数据库上下文

protected override void OnModelCreating(ModelBuilder builder)
{
    builder.HasDefaultSchema(InfrastructureConstants.Schema);
    
    builder.ApplyConfigurationsFromAssembly(Assembly.GetExecutingAssembly());
    
    builder.AddTransactionalOutboxEntities();
    
    base.OnModelCreating(builder);
}

到目前为止,它工作正常,每个应用程序在其数据库模式中都有自己的发件箱实体,每个应用程序都有自己的 MT 发件箱托管服务,该服务不断检查收件箱/发件箱,如下面的日志所示:

[09:17:30 INF SUBSCRIPTIONS] 执行 DbCommand (40ms) [参数=[], CommandType='Text', CommandTimeout='30'] 选择 m.outbox_id、m.created、m.delivered、m.last_sequence_number、m.lock_id、m.row_version 从 ( SELECT * FROM "subscriptions"."outbox_state" ORDER BY "created" LIMIT 1 FOR UPDATE SKIP LOCKED ) 米 LIMIT 2(命令)

...

[09:17:30 INF IDENTITY]执行DbCommand(40ms)[参数=[],CommandType='Text',CommandTimeout='30'] 选择 m.outbox_id、m.created、m.delivered、m.last_sequence_number、m.lock_id、m.row_version 从 ( SELECT * FROM "identity"."outbox_state" ORDER BY "created" LIMIT 1 FOR UPDATE SKIP LOCKED ) 米 LIMIT 2(命令)

...

[09:17:30 INF IDENTITY] 执行 DbCommand (92ms) [参数=[@__removeTimestamp_0='?' (DbType = 日期时间), @__p_1='?' (DbType = Int32)], CommandType='文本', CommandTimeout='30'] SELECT i.id、i.consumed、i.consumer_id、i.delivered、i.expiration_time、i.last_sequence_number、i.lock_id、i.message_id、i.receive_count、i.received、i.row_version 来自 identity.inbox_state AS i 其中 i.delivered 不为 NULL 并且 i.delivered < @__removeTimestamp_0 ORDER BY i.delivered LIMIT @__p_1 (Command)

...

[09:17:30 INF SUBSCRIPTIONS] 执行 DbCommand (92ms) [参数=[@__removeTimestamp_0='?' (DbType = 日期时间), @__p_1='?' (DbType = Int32)], CommandType='文本', CommandTimeout='30'] SELECT i.id、i.consumed、i.consumer_id、i.delivered、i.expiration_time、i.last_sequence_number、i.lock_id、i.message_id、i.receive_count、i.received、i.row_version 来自 subscriptions.inbox_state AS i 其中 i.delivered 不为 NULL 并且 i.delivered < @__removeTimestamp_0 ORDER BY i.delivered LIMIT @__p_1 (Command)

但是,当我将以下代码添加到其中一个应用程序(在本例中为订阅)时,会发生一些奇怪的事情:

app.Lifetime.ApplicationStarted.Register(async () =>
{
    using var scope = app.Services.CreateScope();
    var publishEndpoint = scope.ServiceProvider.GetRequiredService<IPublishEndpoint>();
    var unitOfWork = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
    await publishEndpoint.Publish(new RefreshPlans());
    await unitOfWork.SaveChangesAsync();
});

添加此代码后,所有应用程序开始使用“订阅”。“outbox_state”,但仍使用正确的 inbox_state:

[09:32:25 INF IDENTITY] 执行 DbCommand (1ms) [参数=[], CommandType='Text', CommandTimeout='30'] 选择 m.outbox_id、m.created、m.delivered、m.last_sequence_number、m.lock_id、m.row_version 从 ( SELECT * FROM "subscriptions"."outbox_state" ORDER BY "created" LIMIT 1 FOR UPDATE SKIP LOCKED ) 米 LIMIT 2(命令)

...

[09:32:25 INF IDENTITY] 执行 DbCommand (2ms) [参数=[@__removeTimestamp_0='?' (DbType = 日期时间), @__p_1='?' (DbType = Int32)], CommandType='文本', CommandTimeout='30'] SELECT i.id、i.consumed、i.consumer_id、i.delivered、i.expiration_time、i.last_sequence_number、i.lock_id、i.message_id、i.receive_count、i.received、i.row_version 来自 identity.inbox_state AS i 其中 i.delivered 不为 NULL 并且 i.delivered < @__removeTimestamp_0 ORDER BY i.delivered LIMIT @__p_1 (Command)

...

// 订阅工作正常并使用相同的收件箱/发件箱

来自 IDENTITY 应用程序的所有消息仍正确保存到“identity.outbox_state”中,但它们永远不会传递到代理,因为 IDENTITY 由于某种原因检查“subscriptions.outbox_state”。

这很奇怪,因为 MT / DbContext / 每个应用程序的所有内容都在单独的 DI 中,所以对我来说这看起来像是错误(或者我缺少一些配置?)。看起来发件箱配置的某些部分被缓存/保存在静态内存中,而不是局限于 DI 容器?

<PackageVersion Include="MassTransit" Version="8.1.3" />
<PackageVersion Include="MassTransit.Abstractions" Version="8.1.3" />
<PackageVersion Include="MassTransit.EntityFrameworkCore" Version="8.1.3" />
<PackageVersion Include="MassTransit.RabbitMQ" Version="8.1.3" />
<PackageVersion Include="Microsoft.EntityFrameworkCore" Version="8.0.1" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Design" Version="8.0.1" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Relational" Version="8.0.1" />

使用.net core 8。

masstransit
1个回答
0
投票

https://github.com/MassTransit/MassTransit/issues/5125

使用多个 DbContext 时,必须禁用架构缓存:

 busConfigurator.AddEntityFrameworkOutbox<TDbContext>(o =>
            {
                // instead of o.UsePostgres();
                o.LockStatementProvider = new PostgresLockStatementProvider(false);

                o.UseBusOutbox();
            });
© www.soinside.com 2019 - 2024. All rights reserved.