Kafka 中生成消息本地值序列化错误时出错

问题描述 投票:0回答:1

嗨,我正在 Kafka 和 .Net 工作。我正在尝试生成如下消息。我已经为架构注册表和生产者创建了配置,如下所示。我正在使用证书连接到架构注册表

var schemaRegistryConfig = new SchemaRegistryConfig
    {
        Url = "",
        SslKeystoreLocation = Path.Combine(Directory.GetCurrentDirectory(), @"Certs/decodedca.p12"),
        SslKeystorePassword = "",
        EnableSslCertificateVerification = false
    };
 var producerConfig = new ProducerConfig
    {
        BootstrapServers = "3",
        SaslMechanism = SaslMechanism.ScramSha512,
        SaslUsername = "",
        SaslPassword = "",
        SecurityProtocol = SecurityProtocol.SaslSsl,
        SslCaLocation = Path.Combine(Directory.GetCurrentDirectory(), @"Certs/srdecodedca.crt"),
        EnableSslCertificateVerification = false
    };
 var avroSerializerConfig = new AvroSerializerConfig
    {
        // optional Avro serializer properties:
        BufferBytes = 100
    };
 using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
    using (var producer =
        new ProducerBuilder<string, MLFixtureEmp>(producerConfig)
            .SetValueSerializer(new AvroSerializer<MLFixtureEmp>(schemaRegistry, avroSerializerConfig))
            .Build())
    {
        MLFixtureEmp mLFixtureEmp = new MLFixtureEmp()
        {
            deliveryDate = DateTime.Now,
            ISOCurrencyCode = "INR",
            chartererName = "ss",
            estimateRedeliveryDate = DateTime.Today,
            fixtureId = 4,
            imoNumber = "123",
            managingOwnerName = "dd",
            maximumDate = DateTime.Now,
            minimumDate = DateTime.Now,
            purchaseObligation = "stri",
            rate = 123,
            redeliveryPortName = "dd",
            RedeliveryRanges = "dsd",
            vesselOwnershipType = "dsds"
        };
        producer.ProduceAsync("mytopic", new Message<string, MLFixtureEmp>
        {
            Key = "test",
            Value = mLFixtureEmp
        }).ContinueWith(
            task =>
            {
                if (!task.IsFaulted)
                {
                    Console.WriteLine($"produced to: {task.Result.TopicPartitionOffset}");
                    return;
                }
                Console.WriteLine($"error producing message: {task.Exception.InnerException}");
            });
    }

每当我尝试将数据推送到主题时,我都会收到以下错误消息。

error producing message: Confluent.Kafka.ProduceException`2[System.String,MaerskLine.CHAMPS.Dto.MLFixtureEmp]: Local: Value serialization error
 ---> System.InvalidOperationException: AvroSerializer only accepts type parameters of int, bool, double, string, float, long, byte[], instances of ISpecificRecord and subclasses of SpecificFixed.
   at Confluent.SchemaRegistry.Serdes.SpecificSerializerImpl`1..ctor(ISchemaRegistryClient schemaRegistryClient, Boolean autoRegisterSchema, Int32 initialBufferSize)
   at Confluent.SchemaRegistry.Serdes.AvroSerializer`1.SerializeAsync(T value, SerializationContext context)
   at Confluent.Kafka.Producer`2.ProduceAsync(TopicPartition topicPartition, Message`2 message, CancellationToken cancellationToken)
   --- End of inner exception stack trace ---
   at Confluent.Kafka.Producer`2.ProduceAsync(TopicPartition topicPartition, Message`2 message, CancellationToken cancellationToken)

我无法弄清楚这个问题。有人可以帮我找出问题吗?任何帮助,将不胜感激。谢谢

c# apache-kafka avro confluent-kafka-dotnet
1个回答
0
投票

正如错误消息所示,AvroSerializer 仅接受 int、bool、double、string、float、long、byte[]、ISpecificRecord 的实例和 SpecificFixed 的子类。

因此,一种解决方案是:MLFixtureEmp 类型应该实现 ISpecificRecord 接口。

© www.soinside.com 2019 - 2024. All rights reserved.