我有多个 Web 应用程序,每个应用程序都有自己单独的 DI 容器,在同一进程中运行(侦听不同端口)。每个应用程序将 MassTransit(使用 RabbitMq)添加到具有事务发件箱的 DI,并且每个应用程序都有自己的 DbContext(尽管所有应用程序都使用相同的物理 PG 数据库,只是不同的 DB 模式)。
以下是我如何通过发件箱注册 MT:
services.AddMassTransit(busConfigurator =>
{
busConfigurator.AddConsumers(typeof(UpdateSubscriptionStatusOnSubscriptionStatusPublished).Assembly);
busConfigurator.UsingRabbitMq((context, rabbitCfg) =>
{
rabbitCfg.ConfigureJsonSerializerOptions(jsonSerializerOptions =>
{
jsonSerializerOptions.ConfigureForNodaTime(new NodaJsonSettings());
return jsonSerializerOptions;
});
rabbitCfg.Host(connectionUri);
rabbitCfg.ConfigureEndpoints(context);
});
busConfigurator.AddEntityFrameworkOutbox<DatabaseContext>(o =>
{
o.UsePostgres();
o.UseBusOutbox();
});
});
这是我如何将发件箱实体添加到数据库上下文
protected override void OnModelCreating(ModelBuilder builder)
{
builder.HasDefaultSchema(InfrastructureConstants.Schema);
builder.ApplyConfigurationsFromAssembly(Assembly.GetExecutingAssembly());
builder.AddTransactionalOutboxEntities();
base.OnModelCreating(builder);
}
到目前为止,它工作正常,每个应用程序在其数据库模式中都有自己的发件箱实体,每个应用程序都有自己的 MT 发件箱托管服务,该服务不断检查收件箱/发件箱,如下面的日志所示:
[09:17:30 INF SUBSCRIPTIONS] 执行 DbCommand (40ms) [参数=[], CommandType='Text', CommandTimeout='30'] 选择 m.outbox_id、m.created、m.delivered、m.last_sequence_number、m.lock_id、m.row_version 从 ( SELECT * FROM "subscriptions"."outbox_state" ORDER BY "created" LIMIT 1 FOR UPDATE SKIP LOCKED ) 米 LIMIT 2(命令)...
[09:17:30 INF IDENTITY]执行DbCommand(40ms)[参数=[],CommandType='Text',CommandTimeout='30'] 选择 m.outbox_id、m.created、m.delivered、m.last_sequence_number、m.lock_id、m.row_version 从 ( SELECT * FROM "identity"."outbox_state" ORDER BY "created" LIMIT 1 FOR UPDATE SKIP LOCKED ) 米 LIMIT 2(命令)
...
[09:17:30 INF IDENTITY] 执行 DbCommand (92ms) [参数=[@__removeTimestamp_0='?' (DbType = 日期时间), @__p_1='?' (DbType = Int32)], CommandType='文本', CommandTimeout='30'] SELECT i.id、i.consumed、i.consumer_id、i.delivered、i.expiration_time、i.last_sequence_number、i.lock_id、i.message_id、i.receive_count、i.received、i.row_version 来自 identity.inbox_state AS i 其中 i.delivered 不为 NULL 并且 i.delivered < @__removeTimestamp_0 ORDER BY i.delivered LIMIT @__p_1 (Command)
...
[09:17:30 INF SUBSCRIPTIONS] 执行 DbCommand (92ms) [参数=[@__removeTimestamp_0='?' (DbType = 日期时间), @__p_1='?' (DbType = Int32)], CommandType='文本', CommandTimeout='30'] SELECT i.id、i.consumed、i.consumer_id、i.delivered、i.expiration_time、i.last_sequence_number、i.lock_id、i.message_id、i.receive_count、i.received、i.row_version 来自 subscriptions.inbox_state AS i 其中 i.delivered 不为 NULL 并且 i.delivered < @__removeTimestamp_0 ORDER BY i.delivered LIMIT @__p_1 (Command)
但是,当我将以下代码添加到其中一个应用程序(在本例中为订阅)时,会发生一些奇怪的事情:
app.Lifetime.ApplicationStarted.Register(async () =>
{
using var scope = app.Services.CreateScope();
var publishEndpoint = scope.ServiceProvider.GetRequiredService<IPublishEndpoint>();
var unitOfWork = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
await publishEndpoint.Publish(new RefreshPlans());
await unitOfWork.SaveChangesAsync();
});
添加此代码后,所有应用程序开始使用“订阅”。“outbox_state”,但仍使用正确的 inbox_state:
[09:32:25 INF IDENTITY] 执行 DbCommand (1ms) [参数=[], CommandType='Text', CommandTimeout='30'] 选择 m.outbox_id、m.created、m.delivered、m.last_sequence_number、m.lock_id、m.row_version 从 ( SELECT * FROM "subscriptions"."outbox_state" ORDER BY "created" LIMIT 1 FOR UPDATE SKIP LOCKED ) 米 LIMIT 2(命令)...
[09:32:25 INF IDENTITY] 执行 DbCommand (2ms) [参数=[@__removeTimestamp_0='?' (DbType = 日期时间), @__p_1='?' (DbType = Int32)], CommandType='文本', CommandTimeout='30'] SELECT i.id、i.consumed、i.consumer_id、i.delivered、i.expiration_time、i.last_sequence_number、i.lock_id、i.message_id、i.receive_count、i.received、i.row_version 来自 identity.inbox_state AS i 其中 i.delivered 不为 NULL 并且 i.delivered < @__removeTimestamp_0 ORDER BY i.delivered LIMIT @__p_1 (Command)
...
// 订阅工作正常并使用相同的收件箱/发件箱
来自 IDENTITY 应用程序的所有消息仍正确保存到“identity.outbox_state”中,但它们永远不会传递到代理,因为 IDENTITY 由于某种原因检查“subscriptions.outbox_state”。
这很奇怪,因为 MT / DbContext / 每个应用程序的所有内容都在单独的 DI 中,所以对我来说这看起来像是错误(或者我缺少一些配置?)。看起来发件箱配置的某些部分被缓存/保存在静态内存中,而不是局限于 DI 容器?
<PackageVersion Include="MassTransit" Version="8.1.3" />
<PackageVersion Include="MassTransit.Abstractions" Version="8.1.3" />
<PackageVersion Include="MassTransit.EntityFrameworkCore" Version="8.1.3" />
<PackageVersion Include="MassTransit.RabbitMQ" Version="8.1.3" />
<PackageVersion Include="Microsoft.EntityFrameworkCore" Version="8.0.1" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Design" Version="8.0.1" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Relational" Version="8.0.1" />
使用.net core 8。
https://github.com/MassTransit/MassTransit/issues/5125
使用多个 DbContext 时,必须禁用架构缓存:
busConfigurator.AddEntityFrameworkOutbox<TDbContext>(o =>
{
// instead of o.UsePostgres();
o.LockStatementProvider = new PostgresLockStatementProvider(false);
o.UseBusOutbox();
});