在Python中解码avro格式数据时遇到“AssertionError”

问题描述 投票:0回答:1

我在解码来自kafka主题的avro格式数据时遇到“断言错误”,我们使用apicurio工具在kafka端对数据进行编码。我使用的是python 3.11.6,我使用的代码如下:


try:
    import kafka
    import json
    import requests
    import os
    import sys
    from json import dumps
    from kafka import KafkaProducer

    from kafka import KafkaConsumer
    from confluent_kafka.schema_registry import SchemaRegistryClient
    import io
    from confluent_kafka import Consumer, KafkaError
    from avro.io import DatumReader, BinaryDecoder
    import avro.schema

    from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
    from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
    from confluent_kafka.avro.serializer import (SerializerError,  # noqa
                                                 KeySerializerError,
                                                 ValueSerializerError)

    print("ALL ok")
except Exception as e:
    print("Error : {} ".format(e))


SCHEME_REGISTERY = "http://localhost:8082"
TOPIC = "abc"
BROKER = "localhost:9092"

schema = """
{
   "type":"record",
   "name":"Envelope",
   "namespace":"abc",
   "fields":[
      {
         "name":"before",
         "type":[
            "null",
            {
               "type":"record",
               "name":"Value",
               "fields":[
                  {
                     "name":"field1",
                     "type":"double"
                  },
                  {
                     "name":"field2",
                     "type":[
                        "null",
                        "string"
                     ],
                     "default":null
                  }
               ],
               "connect.name":"abc.Value"
            }
         ],
         "default":null
      },
      {
         "name":"after",
         "type":[
            "null",
            "Value"
         ],
         "default":null
      },
      {
         "name":"source",
         "type":{
            "type":"record",
            "name":"Source",
            "namespace":"io.debezium.connector.oracle",
            "fields":[
               {
                  "name":"version",
                  "type":"string"
               },
               {
                  "name":"connector",
                  "type":"string"
               },
               {
                  "name":"name",
                  "type":"string"
               },
               {
                  "name":"ts_ms",
                  "type":"long"
               },
               {
                  "name":"snapshot",
                  "type":[
                     {
                        "type":"string",
                        "connect.version":1,
                        "connect.parameters":{
                           "allowed":"true,last,false,incremental"
                        },
                        "connect.default":"false",
                        "connect.name":"io.debezium.data.Enum"
                     },
                     "null"
                  ],
                  "default":"false"
               },
               {
                  "name":"db",
                  "type":"string"
               },
               {
                  "name":"sequence",
                  "type":[
                     "null",
                     "string"
                  ],
                  "default":null
               },
               {
                  "name":"schema",
                  "type":"string"
               },
               {
                  "name":"table",
                  "type":"string"
               },
               {
                  "name":"txId",
                  "type":[
                     "null",
                     "string"
                  ],
                  "default":null
               },
               {
                  "name":"scn",
                  "type":[
                     "null",
                     "string"
                  ],
                  "default":null
               },
               {
                  "name":"commit_scn",
                  "type":[
                     "null",
                     "string"
                  ],
                  "default":null
               },
               {
                  "name":"lcr_position",
                  "type":[
                     "null",
                     "string"
                  ],
                  "default":null
               },
               {
                  "name":"rs_id",
                  "type":[
                     "null",
                     "string"
                  ],
                  "default":null
               },
               {
                  "name":"ssn",
                  "type":[
                     "null",
                     "int"
                  ],
                  "default":null
               },
               {
                  "name":"redo_thread",
                  "type":[
                     "null",
                     "int"
                  ],
                  "default":null
               }
            ],
            "connect.name":"io.debezium.connector.oracle.Source"
         }
      },
      {
         "name":"op",
         "type":"string"
      },
      {
         "name":"ts_ms",
         "type":[
            "null",
            "long"
         ],
         "default":null
      },
      {
         "name":"transaction",
         "type":[
            "null",
            {
               "type":"record",
               "name":"ConnectDefault",
               "namespace":"io.confluent.connect.avro",
               "fields":[
                  {
                     "name":"id",
                     "type":"string"
                  },
                  {
                     "name":"total_order",
                     "type":"long"
                  },
                  {
                     "name":"data_collection_order",
                     "type":"long"
                  }
               ]
            }
         ],
         "default":null
      }
   ],
   "connect.name":"abc.tablename.Envelope"
}
"""


schema = avro.schema.Parse(schema)
reader = DatumReader(schema)


def decode_method_2(msg_value):
    message_bytes = io.BytesIO(msg_value)
    message_bytes.seek(5)
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict

def main():
    print("Listening *****************")

    consumer = KafkaConsumer(
        TOPIC,
        bootstrap_servers=[BROKER],
        auto_offset_reset='latest',
        enable_auto_commit=False
    )

    for msg in consumer:
        msg_value = msg.value
        print("\n")
        print("decode_method_2", decode_method_2(msg_value))
        print("\n")

main()

错误我面临的问题如下:

断言错误:b'\xf0@\x02 ABCRequest\x161.9.7.Final\x0cabc-xyz-xyz.abc.com\xb0\xe4\xdb\xd0\xf2b\x00\x08true\x0esystem\x00\x10abccard 表名\x00\x02\x12601987152\x00\x00\x00\x02\x00\x00\x02r\x02\xac\x90\xc9\xbf\xf2b\x00'

python apache-kafka avro confluent-schema-registry
1个回答
0
投票
  1. 您不需要同时安装和导入

    kafka-python
    confluent-kafka-python

  2. 您的架构应该位于注册表中,因此您不需要在代码中提供/解析为字符串。相反,您应该使用 HTTP 函数来下载它。

  3. confluence 库的文档展示了如何使用其注册表中的 avro 数据,这也适用于 apicurio。具体来说,它不需要解析 Avro 架构,而是向消费者提供 AvroDeserializer 实例

    https://github.com/confluenceinc/confluence-kafka-python/blob/master/examples/avro_consumer.py

© www.soinside.com 2019 - 2024. All rights reserved.