如何在Kafka Schema Registry中编程注册Avro Schema?

问题描述 投票:1回答:2

我把数据和schema放到kafka和schema注册表中,用python。

from confluent_kafka import avro

from confluent_kafka.avro import AvroProducer



value_schema_str = """

    {

       "type":"record",

       "name":"myrecord",

       "fields":[

          {

             "name":"ID",

             "type":[

                "null",

                "int"

             ],

             "default":null

          },

          {

             "name":"PRODUCT",

             "type":[

                "null",

                "string"

             ],

             "default":null

          },

          {

             "name":"QUANTITY",

             "type":[

                "null",

                "int"

             ],

             "default":null

          },

          {

             "name":"PRICE",

             "type":[

                "null",

                "int"

             ],

             "default":null

          }

       ]

    }

    """



key_schema_str = """

    {

       "type":"record",

       "name":"key_schema",

       "fields":[

          {

             "name":"ID",

             "type":"int"

          }

       ]

    }

    """





def delivery_report(err, msg):

    """ Called once for each message produced to indicate delivery result.

        Triggered by poll() or flush(). """

    if err is not None:

        print('Message delivery failed: {}'.format(err))

    else:

        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))





if __name__ == '__main__':

    value_schema = avro.loads(value_schema_str)

    key_schema = avro.loads(key_schema_str)

    value = {"ID": 199 , "PRODUCT":"Yagiz Gulbahar", "QUANTITY":1453,"PRICE":61}

    key = {"ID": 199}





    avroProducer = AvroProducer({

        'bootstrap.servers': '10.0.0.0:9092',

        'on_delivery': delivery_report,

        'schema.registry.url': 'http://10.0.0.0:8081'

    }, default_key_schema=key_schema, default_value_schema=value_schema)



    avroProducer.produce(topic='ersin_test_2', key=key, value=value)

    avroProducer.flush()

我可以只放schema不放数据吗?

先谢谢你

python apache-kafka avro confluent-schema-registry kafka-python
2个回答
1
投票

你也可以用以下方法注册一个模式 confluent-kafka-pythonpython-schema-registry-client.


使用 confluent-kafka-python

from confluent_kafka.avro import SchemaRegistryClient, Schema


schema_str = """
    {
        "type": "record",
        "name": "user",
        "fields": [
            {"name": "firstName", "type": "string"},
            {"name": "age",  "type": "int"}
        ]
    }
"""

avro_schema = Schema(schema_str, 'AVRO')
sr = SchemaRegistryClient("http://schema-registry-host:8081")    
_schema_id = client.register_schema("yourSubjectName", avro_schema)

使用 python-schema-registry-client

from schema_registry.client import SchemaRegistryClient, schema


schema_ = schema.AvroSchema({
    "type": "record",
    "name": "user",
    "fields": [
        {"name": "firstName", "type": "string"},
        {"name": "age",  "type": "int"}
    ]
})


my_schema = sr.register("yourSubjectName", schema_)
© www.soinside.com 2019 - 2024. All rights reserved.