MassTransit:通过 Kafka 接收消息时避免推送到 RabbitMQ

问题描述 投票:0回答:1

我正在运行一个应用程序,该应用程序充当通过 RabbitMQ 和 Kafka 发送消息的消费者。它不产生任何东西。当应用程序运行时,我注意到通过 Kafka 接收到的消息也被推送到 RabbitMQ 中。这可以避免吗?我可以配置 MassTransit,使 Kafka 消息不推送到 RabbitMQ 吗?

这是我当前的设置

if (!options.RabbitMq.Enabled) 
    return;

busConfigurator.SetKebabCaseEndpointNameFormatter();
busConfigurator.AddConsumer<RabbitMqConsumer, RabbitMqConsumerDefinition>();
busConfigurator.UsingRabbitMq((context, busFactoryConfigurator) =>
{
    busFactoryConfigurator.Host(new Uri(options.RabbitMq.Endpoint));
    busFactoryConfigurator.ConfigureEndpoints(context);
});
if (!options.Kafka.Enabled) 
   return;
        
var schemaRegistryConfig = new SchemaRegistryConfig
{
    Url = options.Kafka.SchemaRegistryUrl
};
var schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryConfig);
busConfigurator.AddSingleton<ISchemaRegistryClient>(schemaRegistryClient);
        
busConfigurator.AddRider(rider =>
{
    rider.AddConsumer<KafkaMyMessageConsumer>();
    rider.UsingKafka((context, kafkaConfigurator) =>
    {
        kafkaConfigurator.ClientId = options.Kafka.ClientId;
        kafkaConfigurator.Host(options.Kafka.Endpoint, configureHost =>
        {
           // ssl, sasl configuration
        });
        kafkaConfigurator.TopicEndpoint<string, MyMessage>(options.Kafka.Topic, options.Kafka.GroupId, endpointConfigurator =>
        {
           endpointConfigurator.CreateIfMissing(t => t.NumPartitions = 1);
           endpointConfigurator.SetValueDeserializer(new ProtobufDeserializer<MyMessage>().AsSyncOverAsync());
           endpointConfigurator.AutoOffsetReset = AutoOffsetReset.Earliest;
           endpointConfigurator.ConfigureConsumer<KafkaMyMessageConsumer>(context);
        });
    });
});

编辑: 为什么我对 RabbitMQ 和 Kafka 感到困惑,我看到以下日志消息

2023-12-20 23:19:51 [22:19:51 DBG] Endpoint Ready: rabbitmq://my-rabbitmq/kafka/my-kafka-topic/my-service 
2023-12-20 23:19:51 [22:19:51 INF] Bus started: rabbitmq://my-rabbitmq/ 
2023-12-20 23:19:51 [22:19:51 DBG] Partition: my-kafka-topic [[0]] with -1001 was assigned to: my-service-e470f4c3-4581-4a7d-8787-591389c2150b 
2023-12-20 23:19:51 [22:19:51 DBG] RECEIVE rabbitmq://my-rabbitmq/kafka/my-kafka-topic null Foo.V1.MyMessage Bar.KafkaMyMessageConsumer(00:00:00.0014020)

我发现 RabbitMQ 以某种方式与 Kafka-Topic 相关联;但是 RabbitMQ 上没有为 Kafka-Topic 创建交换

我的“修复”是使用

AddMassTransit
两次,使用
IKafkaBus
IRabbitMqBus
并为 Kafka-MassTransit-Integration 调用
busConfigurator.UsingInMemory();
(这会产生类似的日志消息,但使用
loopback
uri 方案)

c# apache-kafka rabbitmq masstransit
1个回答
0
投票

在同一总线实例上从 Kafka 和 RabbitMQ 进行消费工作完全正常,并且 Kafka 和 RabbitMQ 之间没有交互或消息交换。

我不确定你的建议是什么,但你的假设是不正确的。乘客根本不乘坐公交车。

© www.soinside.com 2019 - 2024. All rights reserved.