在公共交通中通过路由密钥拆分邮件

问题描述 投票:0回答:1

我想根据路由键将消息发送到适当的队列。从生产者应用程序开始,我的代码(相关部分)为:

var options = serviceProvider
    .GetService<IConfiguration>()
    .GetOptions<RabbitMqProducerOptions>("RabbitMqProducer");

foreach (var option in options?.Endpoints)
{
    var method = typeof(EndpointConvention).GetMethod("Map", new[] { typeof(Uri) });
    var type = Assembly.Load(option.Assembly).GetTypes().First(t => t.Name == option.Type);
    var genericMethod = method.MakeGenericMethod(new[] { type });

    genericMethod.Invoke(null, new[] { new Uri($"{options.Address}/{option.Name}") });
}

Bus.Factory.CreateUsingRabbitMq(cfg => {
    cfg.Host(options.Address);
    cfg.Send<CreateProducts>(x => x.UseRoutingKeyFormatter(context => context.Message.Platform));
});

以上是可以的-它创建了我在配置文件中声明的交换(扇出交换,如果那很重要)。现在,消费者配置:

var options = serviceProvider.GetService<IConfiguration>().GetOptions<RabbitMqConsumerOptions>("RabbitMqConsumer");

Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host(options.Address);

    foreach (var kvp in options.Endpoints)
    {
        cfg.ReceiveEndpoint("ingest-products", ep =>
        {
            ep.PrefetchCount = kvp.Value.PrefetchCount;
            ep.BindMessageExchanges = false;

            ep.UseMessageRetry(r => r.Interval(kvp.Value.RetryCount, kvp.Value.RetryInterval));
            ep.Bind("ingest-amazon-products", x => BindForEndpoint(x, kvp.Value.RoutingKey));
            BindExchange(ep, kvp.Value.RoutingKey, kvp.Value.Assembly, kvp.Value.Type);
            ep.ConfigureConsumers(serviceProvider);
        });
    }
});

现在,上面的代码可以工作,但不能达到我的预期目的,因为没有匹配路由键的消息仍会传递给我的使用者。我的意思是-如果kvp.Value.RoutingKey配置值为X,并且生产者产生的消息具有路由键Y,则监听X的消费者将获得Y消息。如何解决?

c# rabbitmq masstransit
1个回答
0
投票

[发布赏金后,我发现我应该使用Publish界面的BusFactoryConfigurator方法,并且我正在检查它,所以请不要发布任何答案(无法删除问题悬赏金...)。

© www.soinside.com 2019 - 2024. All rights reserved.