我们有 C# 代码在 Kafka 上生成记录并使用 Avro 模式,我们也可以通过 C# 消费相同的主题,但是当我们想在 Python 中消费它时,我们会收到以下错误:
Message deserialization failed: Message deserialization failed for message at [topic name] [4] offset 17: message does not start with magic byte
当我们用Python生产并用Python消费时,这个过程也很好。
附注C# 中使用 IAsyncSerializer,Python 代码中使用 AvroConsumer。
我们尝试根据架构注册表上的相同 Avro 架构,在 C# 中序列化记录并在 Python 中反序列化。
这里我们有生产者C#代码:
using Avro.IO;
using Avro.Specific;
using Confluent.Kafka;
public class KafkaAvroSerializer<T> : IAsyncSerializer<T>
where T : class, ISpecificRecord
{
public Task<byte[]> SerializeAsync(T data, SerializationContext context)
{
return Task.Run(() =>
{
using (var ms = new MemoryStream())
{
var enc = new BinaryEncoder(ms);
var writer = new SpecificDefaultWriter(data.Schema);
writer.Write(data, enc);
return ms.ToArray();
}
});
}
}
using Avro;
using Avro.Specific;
public class MyRecord : ISpecificRecord
{
public Schema Schema =>
Schema.Parse(@”Schema.Parse @"{
""name"": ""name "",
""namespace"": ""namespace"",
""type"": ""record"",
""fields"": [
{
""name"": ""field1"",
""type"": [
""null"",
""int""
],
""default"": null
},
{
""name"": ""field2"",
""type"": [
""null"",
""string""
],
""default"": null
},
{
""name"": ""field3"",
""type"": [
""null"",
""string""
],
""default"": null
}
]
}");
public int field1 { get; set; }
public string field2 { get; set; }
public string field3 { get; set; }
public object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return field1;
case 1: return field2;
case 2: return field3;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in
Get()");
};
}
public void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: field1= (int)fieldValue; break;
case 1: field2= (string)fieldValue; break;
case 2: field3= (string)fieldValue; break;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in
Put()");
};
}
}
using Confluent.Kafka;
using Confluent.SchemaRegistry;
public async Task Publish<T>(T @event) where T
{
using (var producer = GetProducer())
{
try
{
if (@event == null)
{
return;
}
var kafkaEvent = KafkaEventFactory.Create(@event);
var avroModel = AvroModelFactory.Create(kafkaEvent);
var message = new Message<string, MyRecord>() { Value = avroModel, Key = kafkaEvent.Key };
var deliveryResult = await producer.ProduceAsync(_kafkaOption.Topic, message);
_logger.LogInformation($"Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}'");
}
catch (ProduceException<string, MyRecord> e)
{
_logger.LogError($"Delivery failed: {e.Error.Reason}");
throw;
}
}
}
private IProducer<string, MyRecord> GetProducer()
{
var config = GetProducerConfig();
var schemaRegistry = GetSchemaRegistry();
var producer = new ProducerBuilder<string, MyRecord >(config)
.SetErrorHandler((_, error) =>
{
_logger.LogError(error.ToString());
})
.SetValueSerializer(new KafkaAvroSerializer< MyRecord >())
.Build();
return producer;
}
private ProducerConfig GetProducerConfig()
{
return new ProducerConfig
{
BootstrapServers = “BootstrapServers”
};
}
private CachedSchemaRegistryClient GetSchemaRegistry()
{
return new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url =
“SchemaRegistryUrl” });
}
这也是 Python 的消费者代码:
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
from confluent_kafka import avro
import requests
bootstrap_servers = '[bootstrap_server_ip]:[bootstrap_server_port]'
topic = '[topic_name]'
schema_registry_url = "http://[schema_registry_ip]:[schema_registry_port]"
avro_schema_name = "[schema_name]"
consumer_config = {
'bootstrap.servers': bootstrap_servers,
'schema.registry.url': schema_registry_url,
'group.id': '8',
"auto.offset.reset": "earliest",
}
def fetch_avro_schema(schema_registry_url, schema_name):
url = f"{schema_registry_url}/subjects/{schema_name}/versions/latest"
response = requests.get(url)
if response.status_code == 200:
schema = response.json()["schema"]
return avro.loads(schema)
else:
raise ValueError(f"Failed to fetch Avro schema for {schema_name}.")
try:
avro_schema = fetch_avro_schema(schema_registry_url, avro_schema_name)
except Exception as e:
print(f"While fetch avro schema {e}")
try:
consumer = AvroConsumer(consumer_config, reader_value_schema=avro_schema)
except Exception as e:
print(f"While Avro Consumer {e}")
try:
consumer.subscribe([topic])
except Exception as e:
print(f"While Subscribing Topic {e}")
while True:
try:
message = consumer.poll(1)
if message is None:
continue
# Deserialize with schema
print(message.value())
break
except SerializerError as e:
print(f"Message deserialization failed: {e}")
我希望当我们使用相同的模式进行生产和消费时,我们可以毫无错误地执行该过程!但当我想通过 Python 使用它时,我收到此错误:
Message deserialization failed: Message deserialization failed for message at [topic name] [4] offset 17: message does not start with magic byte