在架构注册表上使用 Avro 架构的 Kafka 主题上使用 C# 生成时,Python 无法使用

问题描述 投票:0回答:0

我们有 C# 代码在 Kafka 上生成记录并使用 Avro 模式,我们也可以通过 C# 消费相同的主题,但是当我们想在 Python 中消费它时,我们会收到以下错误:

Message deserialization failed: Message deserialization failed for message at [topic name] [4] offset 17: message does not start with magic byte

当我们用Python生产并用Python消费时,这个过程也很好。

附注C# 中使用 IAsyncSerializer,Python 代码中使用 AvroConsumer。

我们尝试根据架构注册表上的相同 Avro 架构,在 C# 中序列化记录并在 Python 中反序列化。

这里我们有生产者C#代码:

using Avro.IO;
using Avro.Specific;
using Confluent.Kafka;

public class KafkaAvroSerializer<T> : IAsyncSerializer<T>
where T : class, ISpecificRecord
{
    public Task<byte[]> SerializeAsync(T data, SerializationContext context)
    {
        return Task.Run(() =>
       {
           using (var ms = new MemoryStream())
           {
               var enc = new BinaryEncoder(ms);
               var writer = new SpecificDefaultWriter(data.Schema);
               writer.Write(data, enc);
               return ms.ToArray();
           }
      });
   }
}
using Avro;
using Avro.Specific;

public class MyRecord : ISpecificRecord
{
   public Schema Schema =>
    Schema.Parse(@”Schema.Parse @"{
      ""name"": ""name "",
      ""namespace"": ""namespace"",
      ""type"": ""record"",
      ""fields"": [
        {
          ""name"": ""field1"",
          ""type"": [
            ""null"",
            ""int""
          ],
          ""default"": null
        },
        {
          ""name"": ""field2"",
          ""type"": [
            ""null"",
            ""string""
          ],
          ""default"": null
        },
        {
          ""name"": ""field3"",
          ""type"": [
            ""null"",
            ""string""
          ],
          ""default"": null
        }
      ]
    }");

public int field1 { get; set; }
public string field2 { get; set; }
public string field3 { get; set; }

public object Get(int fieldPos)
{
    switch (fieldPos)
    {
        case 0: return field1;
        case 1: return field2;
        case 2: return field3;
        default: throw new AvroRuntimeException("Bad index " + fieldPos + " in 
 Get()");
    };
}

public void Put(int fieldPos, object fieldValue)
{
    switch (fieldPos)
    {
        case 0: field1= (int)fieldValue; break;
        case 1: field2= (string)fieldValue; break;
        case 2: field3= (string)fieldValue; break;
        default: throw new AvroRuntimeException("Bad index " + fieldPos + " in 
        Put()");
    };
  }
}
using Confluent.Kafka;
using Confluent.SchemaRegistry;
public async Task Publish<T>(T @event) where T 
{
    using (var producer = GetProducer())
    {
        try
        {
            if (@event == null)
            {
                return;
            }

            var kafkaEvent = KafkaEventFactory.Create(@event);
            
            var avroModel = AvroModelFactory.Create(kafkaEvent);
            var message = new Message<string, MyRecord>() { Value = avroModel, Key = kafkaEvent.Key };

            var deliveryResult = await producer.ProduceAsync(_kafkaOption.Topic, message);
            _logger.LogInformation($"Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}'");
        }
        catch (ProduceException<string, MyRecord> e)
        {
            _logger.LogError($"Delivery failed: {e.Error.Reason}");
            throw;
        }
    }
}
private IProducer<string, MyRecord> GetProducer()
{
    var config = GetProducerConfig();
    var schemaRegistry = GetSchemaRegistry();

    var producer = new ProducerBuilder<string, MyRecord >(config)
        .SetErrorHandler((_, error) =>
        {
            _logger.LogError(error.ToString());
        })
        .SetValueSerializer(new KafkaAvroSerializer< MyRecord >())
        .Build();

    return producer;
}

private ProducerConfig GetProducerConfig()
{
    return new ProducerConfig
    {
        BootstrapServers = “BootstrapServers”
    };
}

private CachedSchemaRegistryClient GetSchemaRegistry()
{
    return new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = 
 “SchemaRegistryUrl” });
 }

这也是 Python 的消费者代码:

from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
from confluent_kafka import avro
import requests

bootstrap_servers = '[bootstrap_server_ip]:[bootstrap_server_port]'
topic = '[topic_name]'

schema_registry_url = "http://[schema_registry_ip]:[schema_registry_port]"
avro_schema_name = "[schema_name]"

consumer_config = {
   'bootstrap.servers': bootstrap_servers,
   'schema.registry.url': schema_registry_url,
   'group.id': '8',
   "auto.offset.reset": "earliest",
}

def fetch_avro_schema(schema_registry_url, schema_name):
    url = f"{schema_registry_url}/subjects/{schema_name}/versions/latest"
    response = requests.get(url)
    if response.status_code == 200:
        schema = response.json()["schema"]
        return avro.loads(schema)
    else:
        raise ValueError(f"Failed to fetch Avro schema for {schema_name}.")
try:
    avro_schema = fetch_avro_schema(schema_registry_url, avro_schema_name)
except Exception as e:
    print(f"While fetch avro schema {e}")

try:
    consumer = AvroConsumer(consumer_config, reader_value_schema=avro_schema)
except Exception as e:
    print(f"While Avro Consumer {e}")

try:
    consumer.subscribe([topic])
except Exception as e:
    print(f"While Subscribing Topic {e}")

while True:
    try:
        message = consumer.poll(1)
        if message is None:
            continue

        # Deserialize with schema
        print(message.value())
        break
    except SerializerError as e:
        print(f"Message deserialization failed: {e}")

我希望当我们使用相同的模式进行生产和消费时,我们可以毫无错误地执行该过程!但当我想通过 Python 使用它时,我收到此错误:

Message deserialization failed: Message deserialization failed for message at [topic name] [4] offset 17: message does not start with magic byte

python c# schema avro confluent-schema-registry
© www.soinside.com 2019 - 2024. All rights reserved.