MassTransit.RequestFaultException:“消息”请求出错:无法访问已处置的对象

问题描述 投票:0回答:1

我有两个微服务,分别是rabbitmq和masstransit v8最新版本,我将请求从微A发送到微B并从数据库获取数据然后响应结果。 在应用程序中,一切都很好,但在与 inmemorytestharness 的集成测试中,我收到此错误 MassTransit.RequestFaultException :EventBus.Messages.Contracts.Services.Discounts.Currency.Query.GetCurrencyByName.GetCurrencyByNameRequest 请求错误:无法访问已处置的对象。

请求消息:

public class GetCurrencyByNameRequest
{
    public string Name { get; set; }
}

回复留言:

public class GetCurrencyByNameResponse
{
    public string Id { get; set; } 
    public string Name { get; set; } 
}

消费者:

public class GetCurrencyByNameConsumer : IConsumer<GetCurrencyByNameRequest>
{
    private readonly IUnitOfWork _uow;

    public GetCurrencyByNameConsumer(IUnitOfWork uow)
    {
        _uow = uow;
    }
    public async Task Consume(ConsumeContext<GetCurrencyByNameRequest> context)
    {
        var filter = Builders<Currency>.Filter.Eq(x => x.CurrencyName, new CurrencyName(context.Message.Name));
        var currency = await _uow.GenericRepository<Currency>()
            .GetSingleDocumentByFilterAsync(filter, context.CancellationToken);
        if (currency == null)
        {
            await context.RespondAsync(new GetCurrencyByNameResponse
            {
                Id = string.Empty,
                Name = string.Empty
            });
        }
        else
        {
            await context.RespondAsync(new GetCurrencyByNameResponse
            {
                Id = currency.Id,
                Name = currency.CurrencyName
            });
        }
    }
}

微 A 中的公共交通配置:

   services.AddMassTransit(x =>
        {
            var entryAssembly = AppDomain.CurrentDomain.GetAssemblies()
                .FirstOrDefault(x => x.FullName.Contains("Payment.Application"));
            x.SetKebabCaseEndpointNameFormatter();
            x.AddConsumers(entryAssembly);
            x.UsingRabbitMq((context, cfg) =>
            {
                //cfg.Host(new Uri(configuration["RabbitMqSettings:Uri"]));
                cfg.Host(configuration["RabbitMqSettings:Host"], configuration["RabbitMqSettings:VirtualHost"], h =>
                {
                    h.Username(configuration["RabbitMqSettings:Username"]);
                    h.Password(configuration["RabbitMqSettings:Password"]);
                });
                cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
                cfg.UseMessageRetry(r => r.Immediate(5));
                cfg.UseInMemoryOutbox();
                cfg.ConfigureEndpoints(context);
            });
        });

微型 B 中的公共交通配置

  services.AddMassTransit(x =>
        {
            var entryAssembly = AppDomain.CurrentDomain.GetAssemblies()
                .FirstOrDefault(x => x.FullName.Contains("Discount.Application"));
            x.SetKebabCaseEndpointNameFormatter();
            x.AddConsumers(entryAssembly);
            x.UsingRabbitMq((context, cfg) =>
            {
               // cfg.Host(new Uri(configuration["RabbitMqSettings:Uri"]));
                cfg.Host(configuration["RabbitMqSettings:Host"], configuration["RabbitMqSettings:VirtualHost"], h =>
                {
                    h.Username(configuration["RabbitMqSettings:Username"]);
                    h.Password(configuration["RabbitMqSettings:Password"]);
                });
                cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
                cfg.UseMessageRetry(r => r.Immediate(5));
                cfg.UseInMemoryOutbox();
                cfg.ConfigureEndpoints(context);
            });
        });

大众交通内存测试线束配置

 public static InMemoryTestHarness TestHarness { get; set; } = default!;
    public static IBus TestHarnessBus { get; set; } = default!;
    public static async Task UseHarnessAsync()
    {
        await using var provider = new ServiceCollection()
                .AddScoped<IUnitOfWork, UnitOfWork>()
              

                .AddMassTransitInMemoryTestHarness(x =>
                        {
                            x.SetKebabCaseEndpointNameFormatter();
                            x.AddConsumer<GetCurrencyByNameConsumer>();
                            //                      .Endpoint(x=>x.Name = "get-currency-by-name-request");

                            x.AddConsumerTestHarness<GetCurrencyByNameConsumer>();
                           
                        })

                .AddGenericRequestClient()
                .BuildServiceProvider(true);

        TestHarness = provider.GetRequiredService<InMemoryTestHarness>();
        await TestHarness.Start().ConfigureAwait(false); 
        TestHarnessBus = provider.GetRequiredService<IBus>();

    }

集成测试

    [Test]
    public async Task CreatePackageCommand_InsertToDb_ReturnPackage()
    {
        IRequestClient<GetCurrencyByNameRequest>? client = TestHarnessBus.CreateRequestClient<GetCurrencyByNameRequest>();
        await Task.Delay(20000);
        var req = new GetCurrencyByNameRequest
        {
            Name = "Dollar"
        };
        //error occurs in this line
        var response = await client.GetResponse<GetCurrencyByNameResponse>(req);

        (await TestHarness.Sent.Any<GetCurrencyByNameRequest>()).Should().BeTrue();
        (await TestHarness.Consumed.Any<GetCurrencyByNameResponse>()).Should().BeTrue();

        var consumerHarness = GetConsumerTestHarness<GetCurrencyByNameConsumer>();

        (await consumerHarness.Consumed.Any<GetCurrencyByNameRequest>()).Should().BeTrue();
        var command = new CreatePackageCommand
        {
            IsActive = true,
            Description = new Translation
            {
                Arabic = StringHelper.RandomString(10),
                English = StringHelper.RandomString(10),
                Persian = StringHelper.RandomString(10),
            },
            Sort = 1,
            Name = new Translation
            {
                Arabic = StringHelper.RandomString(10),
                English = StringHelper.RandomString(10),
                Persian = StringHelper.RandomString(10),
            },
            Price = 999,
            CurrencyName = "Dollar"
        };
        var packageId = await SendAsync(command);

    }
testing asp.net-web-api rabbitmq integration-testing masstransit
1个回答
0
投票

嗯。

public static async Task UseHarnessAsync()
    {
        await using var provider = new ServiceCollection()

一旦该方法退出,它将处理掉服务提供者。

© www.soinside.com 2019 - 2024. All rights reserved.