MassTransit Kafka 和发件箱:没有此类配置属性:“schema.registry.url”异常

问题描述 投票:0回答:1

我已经使用 Kafka 和发件箱模式设置了 MassTransit

var confluentConfiguration = builder.Configuration.GetSection("Kafka").ToConfluentConfiguration();
builder.Services.AddSingleton<ISchemaRegistryClient>(new CachedSchemaRegistryClient(confluentConfiguration));
builder.Services.AddMassTransit(busConfigurator =>
{
     busConfigurator.AddEntityFrameworkOutbox<DatabaseContext>(outboxConfigurator =>
     {
        outboxConfigurator.DuplicateDetectionWindow = outboxOptions.DuplicateDetectionWindow;
        outboxConfigurator.IsolationLevel = outboxOptions.IsolationLevel;
        outboxConfigurator.QueryDelay = outboxOptions.QueryDelay;
        outboxConfigurator.QueryMessageLimit = outboxOptions.QueryMessageLimit;
        outboxConfigurator.QueryTimeout = outboxOptions.QueryTimeout;
            
        outboxConfigurator.UsePostgres();
        outboxConfigurator.UseBusOutbox();
     });

     busConfigurator.UsingInMemory(); // mass transit needs a bus, so we use the in-memory bus

     busConfigurator.AddRider(rider =>
     {
         rider.AddProducer<string, UserCreated>("user-created", (context, config) =>
         {
             config.SetValueSerializer(new ProtobufSerializer<UserCreated>(context.GetRequiredService<ISchemaRegistryClient>()));
         });
            
         rider.UsingKafka(new ProducerConfig(confluentConfiguration), (context, config) =>
         {
             config.UseSendFilter(typeof(CustomCloudEventsHeaderFilter<>), context);
         });
     });
});

然后,我将

ITopicProducer<string, UserCreated > userCreatedProducer
注入到我的minimal-api方法中并调用
await userCreatedProducer.Produce(...)
。这会产生以下异常

System.InvalidOperationException: No such configuration property: "schema.registry.url"
   at Confluent.Kafka.Impl.SafeConfigHandle.Set(String name, String value)
   at Confluent.Kafka.Producer`2.<>c__DisplayClass56_0.<.ctor>b__5(KeyValuePair`2 kvp)
   at System.Collections.Generic.List`1.ForEach(Action`1 action)
   at Confluent.Kafka.Producer`2..ctor(ProducerBuilder`2 builder)
   at Confluent.Kafka.ProducerBuilder`2.Build()
   at MassTransit.KafkaIntegration.KafkaProducerContext..ctor(ProducerBuilder`2 producerBuilder, IHostConfiguration hostConfiguration, CancellationToken cancellationToken) in /_/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/KafkaProducerContext.cs:line 24
   at MassTransit.KafkaIntegration.ProducerContextFactory.<>c__DisplayClass7_0.<CreateProducer>g__Create|0(ClientContext clientContext, CancellationToken createCancellationToken) in /_/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/ProducerContextFactory.cs:line 55

关键

schema.registry.url
在confluenceConfiguration中设置(如果没有,CachedSchemaRegistryClient的构造函数已经抛出异常)。

我还缺少什么吗? 我需要打电话

config.Host(...)
吗?

为了完整起见,

appsettings.json

"Kafka": {
    "bootstrap.servers": "localhost:9092",
    "security.protocol": "SaslPlaintext",
    "sasl.mechanism": "PLAIN",
    "sasl.username": "***",
    "sasl.password": "***",
    "schema.registry.url": "http://localhost:8081"
  }

使用

ToConfluentConfiguration
扩展方法读取。

c# masstransit confluent-schema-registry confluent-kafka-dotnet
1个回答
0
投票

Kafka 不知道该属性

schema.registry.url
- 所以它通过该异常告诉你。

此外,仅供参考,MassTransit 发件箱不适用于 Kafka。仅限巴士交通。

© www.soinside.com 2019 - 2024. All rights reserved.