Confluent Python Avro Producer:数据{'..'}不是该模式的示例

问题描述 投票:0回答:1

我无法为特定架构生成数据,也无法理解原因。代码中包含为字典的示例数据是直接使用汇合的“ avro-random-generator”创建的,因此示例数据必须正确,因为它们是直接从架构派生的。 Schema Registry和Avro Random Generator都是Confluent工具,因此不可能是那里的工具生成的示例数据无法与他们的schema注册表一起使用。

这是架构:

{
  "type": "record",
  "name": "schemaV1",
  "namespace": "net.avro.schemaV1",
  "doc": "",
  "fields": [
    {
      "name": "orderId",
      "type": {
        "type": "string",
        "avro.java.string": "String"
      },
      "doc": ""
    },
    {
      "name": "offerId",
      "type": {
        "type": "string",
        "avro.java.string": "String"
      },
      "doc": ""
    },
    {
      "name": "redeemerId",
      "type": [
        "null",
        {
          "type": "string",
          "avro.java.string": "String"
        }
      ],
      "doc": "",
      "default": null
    },
    {
      "name": "eventCancellationType",
      "type": "int",
      "doc": ""
    },
    {
      "name": "ruleIds",
      "type": {
        "type": "array",
        "items": {
          "type": "string",
          "avro.java.string": "String"
        },
        "doc": ""
      }
    },
    {
      "name": "eventOriginator",
      "type": {
        "type": "record",
        "name": "AvroEventPartnerV1",
        "doc": "",
        "fields": [
          {
            "name": "partnerShortName",
            "type": {
              "type": "string",
              "avro.java.string": "String"
            },
            "doc": ""
          },
          {
            "name": "businessUnitShortName",
            "type": [
              "null",
              {
                "type": "string",
                "avro.java.string": "String"
              }
            ],
            "doc": "",
            "default": null
          },
          {
            "name": "branchShortName",
            "type": [
              "null",
              {
                "type": "string",
                "avro.java.string": "String"
              }
            ],
            "doc": "",
            "default": null
          }
        ]
      }
    },
    {
      "name": "roundedDelta",
      "doc": "",
      "type": {
        "type": "record",
        "name": "AvroAmountV1",
        "doc": "Amount with a currency",
        "fields": [
          {
            "name": "amount",
            "type": {
              "type": "bytes",
              "logicalType": "decimal",
              "precision": 21,
              "scale": 3
            },
            "doc": "The amount as a decimal number"
          },
          {
            "name": "currency",
            "type": {
              "type": "string",
              "avro.java.string": "String"
            },
            "doc": ""
          }
        ]
      }
    },
    {
      "name": "rewardableLegalDelta",
      "type": [
        "null",
        "AvroAmountV1"
      ],
      "doc": "",
      "default": null
    },
    {
      "name": "receiptNumber",
      "type": {
        "type": "string",
        "avro.java.string": "String"
      },
      "doc": ""
    },
    {
      "name": "referenceReceiptNumber",
      "type": [
        "null",
        {
          "type": "string",
          "avro.java.string": "String"
        }
      ],
      "doc": "",
      "default": null
    },
    {
      "name": "eventEffectiveTime",
      "type": {
        "type": "long"
      },
      "doc": ""
    }
  ]
}

这是我的脚本:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-

    from confluent_kafka import avro
    from confluent_kafka.avro import AvroProducer, ClientError, ValueSerializerError

    BOOTSTRAP_SERVER = 'localhost:9092'
    SCHEMA_REGISTRY = 'http://localhost:8081'
    TOPIC = 'topicV1'
    SCHEMA_PATH = 'schemas/schemaV1.avsc'


    def schemaReader(SCHEMA_PATH):
        with open(SCHEMA_PATH, 'r') as file:
            data = file.read()

        return data


    def main():
        kafka_config = {
            'bootstrap.servers': BOOTSTRAP_SERVER,
            'schema.registry.url': SCHEMA_REGISTRY
        }

        value_schema = avro.loads( schemaReader(SCHEMA_PATH) )


        null = None

        value = {
      "orderId": "a9bcc55f-e2c0-43d6-b793-ff5f295d051d",
      "offerId": "119475017578242889",
      "redeemerId": "1176a01b-b2dc-45a9-91cc-232361e14f99",
      "eventCancellationType": 0,
      "ruleIds": ["ID-IPM00001"],
      "eventOriginator": {"partnerShortName": 
      "partner","businessUnitShortName": null,"branchShortName": null},
      "roundedDelta": {"amount": "\u8463","currency": "PTS"},
      "rewardableLegalDelta": {"amount": "\u8463","currency": "EUR"},
      "receiptNumber": "19b2ff68-ed06-48f0-9ce9-d697c0eadc19",
      "referenceReceiptNumber": null,
      "eventEffectiveTime": 1569494696656
     }


        avroProducer = AvroProducer(kafka_config, default_value_schema=value_schema )
        avroProducer.produce(topic=TOPIC, value=value, value_schema=value_schema)
        avroProducer.flush()

    if __name__== "__main__":
        main()

这是我收到的回溯:

  File "producer.py", line 64, in <module>
    main()
  File "producer.py", line 60, in main
    avroProducer.produce(topic=TOPIC, value=value, value_schema=value_schema)
  File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/__init__.py", line 80, in produce
    value = self._serializer.encode_record_with_schema(topic, value_schema, value)
  File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 115, in encode_record_with_schema
    return self.encode_record_with_schema_id(schema_id, record, is_key=is_key)
  File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 149, in encode_record_with_schema_id
    writer(record, outf)
  File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 86, in <lambda>
    return lambda record, fp: writer.write(record, avro.io.BinaryEncoder(fp))
  File "/apps/python/python2.7/lib/python2.7/site-packages/avro/io.py", line 1042, in write
    raise AvroTypeException(self.writers_schema, datum)

avro.io.AvroTypeException: The datum {'..'} is not an example of the schema { ..}
python apache-kafka avro kafka-producer-api confluent-kafka
1个回答
0
投票

似乎问题是amount应该是字节类型,但是您具有正常的\u8463字符串。您提到的用于生成随机数据的库通过使用Java默认字符集创建字节字符串:https://github.com/confluentinc/avro-random-generator/blob/master/src/main/java/io/confluent/avro/random/generator/Generator.java#L373

但是,也许默认的不是iso-8859-1,这是java实现(参考实现)使​​用的:https://github.com/apache/avro/blob/bf47ec97e0b7f5701042fac067b73b421a9177b7/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java#L220

© www.soinside.com 2019 - 2024. All rights reserved.