MassTransit - 找不到指定地址的主机

问题描述 投票:0回答:0

我正在尝试实现一个集成测试场景,其中 IIS 主机内的组件 A 使用 IRequestClient 通过 RabbitMQ/Masstransit 总线从组件 B(外部组件)请求一些数据。组件 B 有自己的总线实例在运行。我的目标是通过在测试中注册一个替代消费者来“模拟”组件 B 上的目标消费者,并让它响应相同的请求。

生产代码工作正常。但是当谈到这个测试时,替换消费者被调用,我可以调试它,但是一旦调用

await context.RespondAsync(..);
,我就会得到以下异常:

MassTransit.Messages Error: 0 : R-FAULT rabbitmq://localhost:0/myVHost/Command.Bus.Messages.Data.IGetDataRequest_0.0.0.0 7a000000-9a3c-0005-b155-08db518a24c8 Bus.Messages.Data.IGetDataRequest MassTransit.MessageHandler<Bus.Messages.Data.IGetDataRequest>(00:00:03.2257322) The host was not found for the specified address: rabbitmq://localhost:0/myVHost/bus-MYLOCALHOST-testhost-xeyyyyr48oyym8zfbdpidntbrh?durable=false&autodelete=true, MassTransit.EndpointNotFoundException: The host was not found for the specified address: rabbitmq://localhost:0/myVHost/bus-MYLOCALHOST-testhost-xeyyyyr48oyym8zfbdpidntbrh?durable=false&autodelete=true
         at MassTransit.RabbitMqTransport.Transport.SendTransportProvider.GetSendTransport(Uri address)
         at MassTransit.RabbitMqTransport.Transport.SendTransportProvider.MassTransit.ISendTransportProvider.GetSendTransport(Uri address)
         at MassTransit.Transports.SendEndpointProvider.CreateSendEndpoint(Uri address)
         at MassTransit.Transports.SendEndpointCache`1.GetSendEndpointFromFactory(TKey address, SendEndpointFactory`1 factory)
         at GreenPipes.Caching.Internals.PendingValue`2.CreateValue()
         at MassTransit.Transports.SendEndpointCache`1.GetSendEndpoint(TKey key, SendEndpointFactory`1 factory)
         at MassTransit.Context.BaseConsumeContext.GetSendEndpoint(Uri address)
         at MassTransit.Context.BaseConsumeContext.RespondAsync[T](T message, IPipe`1 sendPipe)
at MyTest.Helpers.Consumers.TestDataRequestConsumer.Consume(ConsumeContext`1 context) in C:\Projects\MyTest\Integration.MyTest\Helpers\Consumers\TestDataRequestConsumer.cs:line 56
   at RBus.MessageHandler.<Setup>b__3_0[T](ConsumeContext`1 c)
   at MassTransit.Pipeline.Filters.HandlerMessageFilter`1.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)

这是测试设置:

    [OneTimeSetUp]
    public async Task SetUp()
    {
       var webApp = new WebApplicationFactory<Startup.IisHost.Startup>()
                .WithWebHostBuilder(builder =>
                {
                    builder.UseTestServer();

                    builder.ConfigureTestServices(services =>
                    {
                        services.Configure<DatabaseOptions>(o => o.ConnectionString = testEnvironment.DbConnectionString);
                        services.Configure<CustomerClientOptions>(configuration.GetSection("Customer"));
                        services.AddSingleton(subscriptionHelper);
                    });
                });
        
        httpClient = webApp.CreateClient();

        var serviceCollection = new ServiceCollection();
        serviceCollection.Configure<CustomerClientOptions>(configuration.GetSection("Customer"));
        serviceCollection.AddCustomerClient();
        serviceCollection.AddTransient<IIdentityProvider, IdentityProvider>();
        serviceCollection.AddBus(configuration.GetSection("BusConfig"), x =>
        {
            x.WithIdentityProvider((provider, id) => provider.GetRequiredService<IIdentityProvider>().ProvideIdentityByCustomerId(id));
            x.WithRequestQueue<IGetDataRequest, TestDataRequestConsumer>();
        });

        var serviceProvider = serviceCollection.BuildServiceProvider();
        var busControl = serviceProvider.GetService<IBusControl>();

        await busControl.StartAsync().ConfigureAwait(false);

        // Further code is irrelevant and removed for brevity
    }

Startup.IisHost.Startup
ConfigureServices 方法中,我注册了在构造函数中注入了 IRequestClient 的客户端,然后添加了总线。

        services.AddTransient<IDataClient, DataClient>();
        services.AddBus(Configuration.GetSection("Bus"), x =>
        {
            x.WithIdentityProvider((provider, customerId) => provider.GetRequiredService<IIdentityProvider>().ProvideIdentityByCustomerId(customerId));
        });
        services.AddHostedService<BusService>();

“实际”总线是从 IHostedService 中启动的:

    public BusService(ILogger<BusService> logger, IBusControl busControl)
    {
        this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
        this.busControl = busControl ?? throw new ArgumentNullException(nameof(busControl));
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        try
        {
            await busControl.StartAsync(cancellationToken).ConfigureAwait(false);
        }
        catch (Exception e)
        {
            logger.LogCritical(e, "Error while starting IIS Bus service");
            throw;
        }
    }

这里是请求客户:

internal class DataClient : IDataClient
{
    private readonly IRequestClient requestClient;

    public DataClient (IRequestClient requestClient)
    {
        this.requestClient = requestClient ?? throw new ArgumentNullException(nameof(requestClient));
    }

    /// <inheritdoc/>
    public async Task<IDictionary<SomeData, OtherData>> GetData(IEnumerable<string> names, IEnumerable<int> ids)
    {
        if (names == null) throw new ArgumentNullException(nameof(names));
        if (ids == null) throw new ArgumentNullException(nameof(ids));

        var namesList = names.ToList();
        var idsList = ids.ToList();

        if (!idsList.Any())
        {
            return new Dictionary<SomeData, OtherData>();
        }

        var response = await requestClient.Request<IGetDataRequest, IGetDataResponse>(new
        {
            Names = namesList ,
            Ids = idsList 
        }).ConfigureAwait(false);

        var riiDeviceCredentialsResponse = response.Message;

        ...
    }
}

这是消费者(与组件 B 中的消费者相同):

public class TestDataRequestConsumer: IConsumer<IGetDataRequest>
{
    private readonly GetDataOptions options;

    public TestDataRequestConsumer(IOptions<GetDataOptions> options)
    {
        if (options == null) throw new ArgumentNullException(nameof(options));
        this.options = options.Value;
    }

    public async Task Consume(ConsumeContext<IGetDataRequest> context)
    {
        // Do some stuff..

        await context.RespondAsync(response).ConfigureAwait(false);
    }
}

测试和组件 B 都使用相同的总线配置:

"UseInMemory": false,
"HostScheme": "rabbitmq",
"HostName": "localhost",
"HostPort": 5672,
"Username": "myuser",
"Password": "mypassword",
"VHostName": "myVHost",
"ServerVersion": "0.0.0.0",
"NumberOfRetry": 3,
"SecBetweenRetry": 2,
"PrefetchCount": 4,

当我查看 RabbitMQ 管理控制台时,我可以看到消息首先被添加到正确的队列中,但随后被移动到错误队列中。

我正在使用的专有总线库正在使用 MassTransit 5.1.5.1633.

我不太确定问题出在哪里。任何帮助和/或进一步调查的指示将不胜感激。

如果需要更多信息,请告诉我。

.net rabbitmq nunit masstransit
© www.soinside.com 2019 - 2024. All rights reserved.