我在解码来自kafka主题的avro格式数据时遇到“断言错误”,我们使用apicurio工具在kafka端对数据进行编码。我使用的是python 3.11.6,我使用的代码如下:
try:
import kafka
import json
import requests
import os
import sys
from json import dumps
from kafka import KafkaProducer
from kafka import KafkaConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
import io
from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer import (SerializerError, # noqa
KeySerializerError,
ValueSerializerError)
print("ALL ok")
except Exception as e:
print("Error : {} ".format(e))
SCHEME_REGISTERY = "http://localhost:8082"
TOPIC = "abc"
BROKER = "localhost:9092"
schema = """
{
"type":"record",
"name":"Envelope",
"namespace":"abc",
"fields":[
{
"name":"before",
"type":[
"null",
{
"type":"record",
"name":"Value",
"fields":[
{
"name":"field1",
"type":"double"
},
{
"name":"field2",
"type":[
"null",
"string"
],
"default":null
}
],
"connect.name":"abc.Value"
}
],
"default":null
},
{
"name":"after",
"type":[
"null",
"Value"
],
"default":null
},
{
"name":"source",
"type":{
"type":"record",
"name":"Source",
"namespace":"io.debezium.connector.oracle",
"fields":[
{
"name":"version",
"type":"string"
},
{
"name":"connector",
"type":"string"
},
{
"name":"name",
"type":"string"
},
{
"name":"ts_ms",
"type":"long"
},
{
"name":"snapshot",
"type":[
{
"type":"string",
"connect.version":1,
"connect.parameters":{
"allowed":"true,last,false,incremental"
},
"connect.default":"false",
"connect.name":"io.debezium.data.Enum"
},
"null"
],
"default":"false"
},
{
"name":"db",
"type":"string"
},
{
"name":"sequence",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"schema",
"type":"string"
},
{
"name":"table",
"type":"string"
},
{
"name":"txId",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"scn",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"commit_scn",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"lcr_position",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"rs_id",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"ssn",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"redo_thread",
"type":[
"null",
"int"
],
"default":null
}
],
"connect.name":"io.debezium.connector.oracle.Source"
}
},
{
"name":"op",
"type":"string"
},
{
"name":"ts_ms",
"type":[
"null",
"long"
],
"default":null
},
{
"name":"transaction",
"type":[
"null",
{
"type":"record",
"name":"ConnectDefault",
"namespace":"io.confluent.connect.avro",
"fields":[
{
"name":"id",
"type":"string"
},
{
"name":"total_order",
"type":"long"
},
{
"name":"data_collection_order",
"type":"long"
}
]
}
],
"default":null
}
],
"connect.name":"abc.tablename.Envelope"
}
"""
schema = avro.schema.Parse(schema)
reader = DatumReader(schema)
def decode_method_2(msg_value):
message_bytes = io.BytesIO(msg_value)
message_bytes.seek(5)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
def main():
print("Listening *****************")
consumer = KafkaConsumer(
TOPIC,
bootstrap_servers=[BROKER],
auto_offset_reset='latest',
enable_auto_commit=False
)
for msg in consumer:
msg_value = msg.value
print("\n")
print("decode_method_2", decode_method_2(msg_value))
print("\n")
main()
错误我面临的问题如下:
断言错误:b'\xf0@\x02 ABCRequest\x161.9.7.Final\x0cabc-xyz-xyz.abc.com\xb0\xe4\xdb\xd0\xf2b\x00\x08true\x0esystem\x00\x10abccard 表名\x00\x02\x12601987152\x00\x00\x00\x02\x00\x00\x02r\x02\xac\x90\xc9\xbf\xf2b\x00'
您不需要同时安装和导入
kafka-python
和 confluent-kafka-python
。
您的架构应该位于注册表中,因此您不需要在代码中提供/解析为字符串。相反,您应该使用 HTTP 函数来下载它。
confluence 库的文档展示了如何使用其注册表中的 avro 数据,这也适用于 apicurio。具体来说,它不需要解析 Avro 架构,而是向消费者提供 AvroDeserializer 实例
https://github.com/confluenceinc/confluence-kafka-python/blob/master/examples/avro_consumer.py