我有两个微服务,分别是rabbitmq和masstransit v8最新版本,我将请求从微A发送到微B并从数据库获取数据然后响应结果。 在应用程序中,一切都很好,但在与 inmemorytestharness 的集成测试中,我收到此错误 MassTransit.RequestFaultException :EventBus.Messages.Contracts.Services.Discounts.Currency.Query.GetCurrencyByName.GetCurrencyByNameRequest 请求错误:无法访问已处置的对象。
请求消息:
public class GetCurrencyByNameRequest
{
public string Name { get; set; }
}
回复留言:
public class GetCurrencyByNameResponse
{
public string Id { get; set; }
public string Name { get; set; }
}
消费者:
public class GetCurrencyByNameConsumer : IConsumer<GetCurrencyByNameRequest>
{
private readonly IUnitOfWork _uow;
public GetCurrencyByNameConsumer(IUnitOfWork uow)
{
_uow = uow;
}
public async Task Consume(ConsumeContext<GetCurrencyByNameRequest> context)
{
var filter = Builders<Currency>.Filter.Eq(x => x.CurrencyName, new CurrencyName(context.Message.Name));
var currency = await _uow.GenericRepository<Currency>()
.GetSingleDocumentByFilterAsync(filter, context.CancellationToken);
if (currency == null)
{
await context.RespondAsync(new GetCurrencyByNameResponse
{
Id = string.Empty,
Name = string.Empty
});
}
else
{
await context.RespondAsync(new GetCurrencyByNameResponse
{
Id = currency.Id,
Name = currency.CurrencyName
});
}
}
}
微 A 中的公共交通配置:
services.AddMassTransit(x =>
{
var entryAssembly = AppDomain.CurrentDomain.GetAssemblies()
.FirstOrDefault(x => x.FullName.Contains("Payment.Application"));
x.SetKebabCaseEndpointNameFormatter();
x.AddConsumers(entryAssembly);
x.UsingRabbitMq((context, cfg) =>
{
//cfg.Host(new Uri(configuration["RabbitMqSettings:Uri"]));
cfg.Host(configuration["RabbitMqSettings:Host"], configuration["RabbitMqSettings:VirtualHost"], h =>
{
h.Username(configuration["RabbitMqSettings:Username"]);
h.Password(configuration["RabbitMqSettings:Password"]);
});
cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
cfg.UseMessageRetry(r => r.Immediate(5));
cfg.UseInMemoryOutbox();
cfg.ConfigureEndpoints(context);
});
});
微型 B 中的公共交通配置
services.AddMassTransit(x =>
{
var entryAssembly = AppDomain.CurrentDomain.GetAssemblies()
.FirstOrDefault(x => x.FullName.Contains("Discount.Application"));
x.SetKebabCaseEndpointNameFormatter();
x.AddConsumers(entryAssembly);
x.UsingRabbitMq((context, cfg) =>
{
// cfg.Host(new Uri(configuration["RabbitMqSettings:Uri"]));
cfg.Host(configuration["RabbitMqSettings:Host"], configuration["RabbitMqSettings:VirtualHost"], h =>
{
h.Username(configuration["RabbitMqSettings:Username"]);
h.Password(configuration["RabbitMqSettings:Password"]);
});
cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
cfg.UseMessageRetry(r => r.Immediate(5));
cfg.UseInMemoryOutbox();
cfg.ConfigureEndpoints(context);
});
});
大众交通内存测试线束配置
public static InMemoryTestHarness TestHarness { get; set; } = default!;
public static IBus TestHarnessBus { get; set; } = default!;
public static async Task UseHarnessAsync()
{
await using var provider = new ServiceCollection()
.AddScoped<IUnitOfWork, UnitOfWork>()
.AddMassTransitInMemoryTestHarness(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.AddConsumer<GetCurrencyByNameConsumer>();
// .Endpoint(x=>x.Name = "get-currency-by-name-request");
x.AddConsumerTestHarness<GetCurrencyByNameConsumer>();
})
.AddGenericRequestClient()
.BuildServiceProvider(true);
TestHarness = provider.GetRequiredService<InMemoryTestHarness>();
await TestHarness.Start().ConfigureAwait(false);
TestHarnessBus = provider.GetRequiredService<IBus>();
}
集成测试
[Test]
public async Task CreatePackageCommand_InsertToDb_ReturnPackage()
{
IRequestClient<GetCurrencyByNameRequest>? client = TestHarnessBus.CreateRequestClient<GetCurrencyByNameRequest>();
await Task.Delay(20000);
var req = new GetCurrencyByNameRequest
{
Name = "Dollar"
};
//error occurs in this line
var response = await client.GetResponse<GetCurrencyByNameResponse>(req);
(await TestHarness.Sent.Any<GetCurrencyByNameRequest>()).Should().BeTrue();
(await TestHarness.Consumed.Any<GetCurrencyByNameResponse>()).Should().BeTrue();
var consumerHarness = GetConsumerTestHarness<GetCurrencyByNameConsumer>();
(await consumerHarness.Consumed.Any<GetCurrencyByNameRequest>()).Should().BeTrue();
var command = new CreatePackageCommand
{
IsActive = true,
Description = new Translation
{
Arabic = StringHelper.RandomString(10),
English = StringHelper.RandomString(10),
Persian = StringHelper.RandomString(10),
},
Sort = 1,
Name = new Translation
{
Arabic = StringHelper.RandomString(10),
English = StringHelper.RandomString(10),
Persian = StringHelper.RandomString(10),
},
Price = 999,
CurrencyName = "Dollar"
};
var packageId = await SendAsync(command);
}
嗯。
public static async Task UseHarnessAsync()
{
await using var provider = new ServiceCollection()
一旦该方法退出,它将处理掉服务提供者。