我已经使用 Kafka 和发件箱模式设置了 MassTransit
var confluentConfiguration = builder.Configuration.GetSection("Kafka").ToConfluentConfiguration();
builder.Services.AddSingleton<ISchemaRegistryClient>(new CachedSchemaRegistryClient(confluentConfiguration));
builder.Services.AddMassTransit(busConfigurator =>
{
busConfigurator.AddEntityFrameworkOutbox<DatabaseContext>(outboxConfigurator =>
{
outboxConfigurator.DuplicateDetectionWindow = outboxOptions.DuplicateDetectionWindow;
outboxConfigurator.IsolationLevel = outboxOptions.IsolationLevel;
outboxConfigurator.QueryDelay = outboxOptions.QueryDelay;
outboxConfigurator.QueryMessageLimit = outboxOptions.QueryMessageLimit;
outboxConfigurator.QueryTimeout = outboxOptions.QueryTimeout;
outboxConfigurator.UsePostgres();
outboxConfigurator.UseBusOutbox();
});
busConfigurator.UsingInMemory(); // mass transit needs a bus, so we use the in-memory bus
busConfigurator.AddRider(rider =>
{
rider.AddProducer<string, UserCreated>("user-created", (context, config) =>
{
config.SetValueSerializer(new ProtobufSerializer<UserCreated>(context.GetRequiredService<ISchemaRegistryClient>()));
});
rider.UsingKafka(new ProducerConfig(confluentConfiguration), (context, config) =>
{
config.UseSendFilter(typeof(CustomCloudEventsHeaderFilter<>), context);
});
});
});
然后,我将
ITopicProducer<string, UserCreated > userCreatedProducer
注入到我的minimal-api方法中并调用await userCreatedProducer.Produce(...)
。这会产生以下异常
System.InvalidOperationException: No such configuration property: "schema.registry.url"
at Confluent.Kafka.Impl.SafeConfigHandle.Set(String name, String value)
at Confluent.Kafka.Producer`2.<>c__DisplayClass56_0.<.ctor>b__5(KeyValuePair`2 kvp)
at System.Collections.Generic.List`1.ForEach(Action`1 action)
at Confluent.Kafka.Producer`2..ctor(ProducerBuilder`2 builder)
at Confluent.Kafka.ProducerBuilder`2.Build()
at MassTransit.KafkaIntegration.KafkaProducerContext..ctor(ProducerBuilder`2 producerBuilder, IHostConfiguration hostConfiguration, CancellationToken cancellationToken) in /_/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/KafkaProducerContext.cs:line 24
at MassTransit.KafkaIntegration.ProducerContextFactory.<>c__DisplayClass7_0.<CreateProducer>g__Create|0(ClientContext clientContext, CancellationToken createCancellationToken) in /_/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/ProducerContextFactory.cs:line 55
关键
schema.registry.url
在confluenceConfiguration中设置(如果没有,CachedSchemaRegistryClient的构造函数已经抛出异常)。
我还缺少什么吗? 我需要打电话
config.Host(...)
吗?
为了完整起见,
appsettings.json
:
"Kafka": {
"bootstrap.servers": "localhost:9092",
"security.protocol": "SaslPlaintext",
"sasl.mechanism": "PLAIN",
"sasl.username": "***",
"sasl.password": "***",
"schema.registry.url": "http://localhost:8081"
}
使用
ToConfluentConfiguration
扩展方法读取。
Kafka 不知道该属性
schema.registry.url
- 所以它通过该异常告诉你。
此外,仅供参考,MassTransit 发件箱不适用于 Kafka。仅限巴士交通。