刚刚从 MassTransit 6.3.2 升级到 7.2.4。我正在使用 .Net Core 5。
以下单元测试在升级前工作正常,但在升级后失败。
using var harness = new InMemoryTestHarness();
harness.Consumer(() => new MockedxxxService(), xxxEndPoint);
harness.Start().Wait();
IBus endpoint = harness.Bus;
var tasks = new System.Collections.Concurrent.ConcurrentBag<Task<string>>();
var result = Parallel.For(0, 100, index =>
{
var sut = new xxxRetriever(..., endpoint, ...);
tasks.Add(sut.Getxxx(paramx));
});
Assert.True(result.IsCompleted);
await Task.WhenAll(tasks);
xxxRetriever 类中的代码如下所示。
public async Task<string> Getxxx(string paramx)
{
try
{
var serviceAddress = new Uri("queue:" + ...);
var xxxService = _busService.CreateRequestClient<xxxContract>(serviceAddress, TimeSpan.FromMilliseconds(xxx));
var xxxResponse = await xxxService.GetResponse<xxxResultContract>(new
{
...
}).ConfigureAwait(false);
....
}
}
端点作为 IBus 注入到类中。
模拟的服务看起来像这样。
public class MockedxxxService : IConsumer<xxxContract>
{
public async Task Consume(ConsumeContext<xxxContract> context)
{
await context.RespondAsync<xxxResultContract>(new { ... } } });
}
}
当我们将任务数量限制为 30 左右时,测试运行良好。但超过此范围,测试就会失败并始终显示消息“超时等待响应,RequestId:...”。
如有任何帮助,我们将不胜感激。
这可能与 TPL 中的争用有关,但这只是猜测。您也许可以配置增加总线上的并发限制,看看是否有帮助:
harness.OnConfigureInMemoryBus += c =>
{
c.Host(h => h.TransportConcurrencyLimit = 100);
};
这是一个猜测,但它可能是相关的,因为它是特定于负载的。
显然,这应该在调用
Start
之前调用硬度(应该是 await
,而不是顺便使用 .Wait()
)。
经过大量调查,我们发现这是由.Net 5处理线程创建的方式引起的。请参阅 ThreadPool.SetMinThreads 不会创建任何新线程
我们在测试前启动了线程池并解决了问题。不知道为什么这在 MassTransit 6 下的 .Net 5 中可以正常工作,但现在可以正常工作了。