嗨,我正在 Kafka 和 .Net 工作。我正在尝试生成如下消息。我已经为架构注册表和生产者创建了配置,如下所示。我正在使用证书连接到架构注册表
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = "",
SslKeystoreLocation = Path.Combine(Directory.GetCurrentDirectory(), @"Certs/decodedca.p12"),
SslKeystorePassword = "",
EnableSslCertificateVerification = false
};
var producerConfig = new ProducerConfig
{
BootstrapServers = "3",
SaslMechanism = SaslMechanism.ScramSha512,
SaslUsername = "",
SaslPassword = "",
SecurityProtocol = SecurityProtocol.SaslSsl,
SslCaLocation = Path.Combine(Directory.GetCurrentDirectory(), @"Certs/srdecodedca.crt"),
EnableSslCertificateVerification = false
};
var avroSerializerConfig = new AvroSerializerConfig
{
// optional Avro serializer properties:
BufferBytes = 100
};
using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
using (var producer =
new ProducerBuilder<string, MLFixtureEmp>(producerConfig)
.SetValueSerializer(new AvroSerializer<MLFixtureEmp>(schemaRegistry, avroSerializerConfig))
.Build())
{
MLFixtureEmp mLFixtureEmp = new MLFixtureEmp()
{
deliveryDate = DateTime.Now,
ISOCurrencyCode = "INR",
chartererName = "ss",
estimateRedeliveryDate = DateTime.Today,
fixtureId = 4,
imoNumber = "123",
managingOwnerName = "dd",
maximumDate = DateTime.Now,
minimumDate = DateTime.Now,
purchaseObligation = "stri",
rate = 123,
redeliveryPortName = "dd",
RedeliveryRanges = "dsd",
vesselOwnershipType = "dsds"
};
producer.ProduceAsync("mytopic", new Message<string, MLFixtureEmp>
{
Key = "test",
Value = mLFixtureEmp
}).ContinueWith(
task =>
{
if (!task.IsFaulted)
{
Console.WriteLine($"produced to: {task.Result.TopicPartitionOffset}");
return;
}
Console.WriteLine($"error producing message: {task.Exception.InnerException}");
});
}
每当我尝试将数据推送到主题时,我都会收到以下错误消息。
error producing message: Confluent.Kafka.ProduceException`2[System.String,MaerskLine.CHAMPS.Dto.MLFixtureEmp]: Local: Value serialization error
---> System.InvalidOperationException: AvroSerializer only accepts type parameters of int, bool, double, string, float, long, byte[], instances of ISpecificRecord and subclasses of SpecificFixed.
at Confluent.SchemaRegistry.Serdes.SpecificSerializerImpl`1..ctor(ISchemaRegistryClient schemaRegistryClient, Boolean autoRegisterSchema, Int32 initialBufferSize)
at Confluent.SchemaRegistry.Serdes.AvroSerializer`1.SerializeAsync(T value, SerializationContext context)
at Confluent.Kafka.Producer`2.ProduceAsync(TopicPartition topicPartition, Message`2 message, CancellationToken cancellationToken)
--- End of inner exception stack trace ---
at Confluent.Kafka.Producer`2.ProduceAsync(TopicPartition topicPartition, Message`2 message, CancellationToken cancellationToken)
我无法弄清楚这个问题。有人可以帮我找出问题吗?任何帮助,将不胜感激。谢谢
正如错误消息所示,AvroSerializer 仅接受 int、bool、double、string、float、long、byte[]、ISpecificRecord 的实例和 SpecificFixed 的子类。
因此,一种解决方案是:MLFixtureEmp 类型应该实现 ISpecificRecord 接口。