C# 与 avro 序列化相融合的 kafka 问题

问题描述 投票:0回答:3

我正在使用 docker 运行 kafka 和来自 https://github.com/confluenceinc/cp-all-in-one 的其他服务 在我的测试项目中使用 kafka、avro 和 schemaRegistry 的融合 nuget 包。

如果要发送 json 消息,到目前为止我没有问题,但我正在努力发送 avro 序列化消息。

我看到了https://github.com/confluenceinc/confluence-kafka-dotnet/tree/master/examples/AvroSpecific示例,我尝试以同样的方式执行此操作,但最终我得到了如下异常:

本地:值序列化错误
在 Confluence.Kafka.Producer

2.<ProduceAsync>d__52.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter
1.GetResult() 在 Kafka_ Producer.KafkaService.d__10.MoveNext() 中 C:\Users\lu95eb\source epos\Kafka_playground\Kafka 生产者\KafkaService.cs:第 126 行

内部例外

未将对象引用设置为对象的实例。
在 Confluence.SchemaRegistry.Serdes.SpecificSerializerImpl

1..ctor(ISchemaRegistryClient schemaRegistryClient, Boolean autoRegisterSchema, Int32 initialBufferSize) at Confluent.SchemaRegistry.Serdes.AvroSerializer
1.d__6.MoveNext() 在 System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(任务任务) 在 System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务) 在 System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(任务任务) 在 Confluence.Kafka.Producer`2.d__52.MoveNext()

这是我的 SpecificRecord 类

public class UserInfo : ISpecificRecord
{
    public string Name { get; set; }
    public int[] Numbers { get; set; }

    public Schema Schema => Schema.Parse(@"
        {
          ""name"": ""UserInfo"",
          ""type"": ""record"",
          ""namespace"": ""kafka"",
          ""fields"": [
            {
              ""name"": ""Name"",
              ""type"": ""string""
            },
            {
              ""name"": ""Numbers"",
              ""type"": {
                ""type"": ""array"",
                ""items"": ""int""
              }
            }
          ]
        }
        ");

    public object Get(int fieldPos)
    {
        switch (fieldPos)
        {
            case 0: return Name;
            case 1: return Numbers;
            default: throw new AvroRuntimeException($"Bad index {fieldPos} in Get()");
        }
    }

    public void Put(int fieldPos, object fieldValue)
    {
        switch (fieldPos)
        {
            case 0: Name = (string)fieldValue; break;
            case 1: Numbers = (int[])fieldValue; break;
            default: throw new AvroRuntimeException($"Bad index {fieldPos} in Put()");
        }
    }
}

以及发送消息的方法

private async Task SendSpecificRecord(UserInfo userInfo)
    {
        using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = _schemaRegistryUrl }))
        using (var producer =
            new ProducerBuilder<string, UserInfo>(new ProducerConfig { BootstrapServers = _brokerUrl })
                .SetKeySerializer(new AvroSerializer<string>(schemaRegistry))
                .SetValueSerializer(new AvroSerializer<UserInfo>(schemaRegistry))
                .Build())
        {

            var message = new Message<string, UserInfo>
            {
                Key = userInfo.Name,
                Value = userInfo
            };


            await producer.ProduceAsync(SpecificTopic, message);
        }
    }

KafkaService.cs:第 126 行是

await producer.ProduceAsync(SpecificTopic, message);

就像我在开始时写的那样,我对 schemaRegistry 没有任何问题 - 我注册了模式并且它们可以正常工作于 json,我对主题、代理、消费者或其他任何东西都没有问题。

如果有人能指出我做错了什么,我将不胜感激。 预先感谢您。

c# apache-kafka avro confluent-platform
3个回答
2
投票

如果有人对解决方案感到好奇(我无法想象有人会怎样;)) 然后我编写了“自定义”avro 序列化器和反序列化器,并且工作起来就像一个魅力。

public class CustomAvroSerializer<T> : IAsyncSerializer<T>
    where T : class, ISpecificRecord
{
    public Task<byte[]> SerializeAsync(T data, SerializationContext context)
    {
        return Task.Run(() =>
        {
            using (var ms = new MemoryStream())
            {
                var enc = new BinaryEncoder(ms);
                var writer = new SpecificDefaultWriter(data.Schema);
                writer.Write(data, enc);
                return ms.ToArray();
            }
        });
    }
}

public class CustomAvroDeserializer<T> : IDeserializer<T>
    where T : class, ISpecificRecord
{
    public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
    {
        using (var ms = new MemoryStream(data.ToArray()))
        {
            var dec = new BinaryDecoder(ms);
            var regenObj = (T)Activator.CreateInstance(typeof(T));

            var reader = new SpecificDefaultReader(regenObj.Schema, regenObj.Schema);
            reader.Read(regenObj, dec);
            return regenObj;
        }
    }
}

2
投票

我遇到了同样的问题,在查看 github 上的库代码后能够解决它。 看来模式注册表需要在实现 ISpecificRecord 的类中调用 _SCHEMA 的静态字段。

所以如果你添加一个 公共静态_SCHEMA = Schema.Parse(....

并更改您的公共架构 => UserInfo._SCHEMA;

无需您的解决方法,它也可以工作,它只是忽略架构注册表。


0
投票
.SetValueSerializer(new AvroSerializer<P>(schemaRegistryClient, new AvroSerializerConfig() { AutoRegisterSchemas = false, UseLatestVersion = true}))
© www.soinside.com 2019 - 2024. All rights reserved.