如何更改SubjectNameStrategy并在confluence-kafka-python的AvroProducer中使用不同的模式?

问题描述 投票:0回答:2

我使用 confluence-kafka-python 的 AvroProducer 进行序列化。如何将SubjectNameStrategy更改为RecordNameStrategy,以便我可以在同一主题中使用不同的模式?或者有更好的方法来实现同样的目标吗?

apache-kafka avro confluent-schema-registry confluent-kafka-python
2个回答
0
投票

尝试将 AvroSerializer 中的

subject.name.strategy
设置为
record_subject_name_strategy

https://docs.confluence.io/platform/current/clients/confluence-kafka-python/html/index.html#avroserializer


0
投票

这里是如何在同一主题中实现多个模式的示例:

from confluent_kafka.schema_registry import  topic_record_subject_name_strategy
from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.avro import SerializerError
import avro.schema
from faker import Faker

fake = Faker()

# Define multiple Avro schemas for the values
value_schema_str_1 = """
{
   "namespace": "my.test",
   "name": "value1",
   "type": "record",
   "fields" : [
     {
       "name" : "language",
       "type" : "string"
     },
     {
       "name" : "greeting",
       "type" : "string"
     }
   ]
}
"""

value_schema_str_2 = """
{
   "namespace": "my.test",
   "name": "value2",
   "type": "record",
   "fields" : [
     {
       "name" : "name",
       "type" : "string"
     },
     {
       "name" : "age",
       "type" : "int"
     }
   ]
}
"""

# Define the Avro schema for the key
key_schema_str = """
{
   "namespace": "my.test",
   "name": "key",
   "type": "record",
   "fields" : [
     {
       "name" : "timestamp",
       "type" : "long"
     }
   ]
}
"""

# Parse the schema strings into AvroSerializer
value_schema_1 = avro.schema.parse(value_schema_str_1)
value_schema_2 = avro.schema.parse(value_schema_str_2)
key_schema = avro.schema.parse(key_schema_str)

# Create a Schema Registry client
schema_registry_client = SchemaRegistryClient({
    'url': 'http://localhost:8081'
})

# Create AvroSerializers using the parsed schemas and the Schema Registry client
value_serializer_1 = AvroSerializer(schema_str=value_schema_str_1,
                                    schema_registry_client=schema_registry_client,
                                    conf={'subject.name.strategy': topic_record_subject_name_strategy})
value_serializer_2 = AvroSerializer(schema_str=value_schema_str_2,
                                    schema_registry_client=schema_registry_client,
                                    conf={'subject.name.strategy': topic_record_subject_name_strategy})
key_serializer = AvroSerializer(schema_str=key_schema_str,
                                schema_registry_client=schema_registry_client)

# Function to create and send a message with a specific value serializer
def send_message(value_serializer, value):
    try:
      # SerializingProducer configuration
      producer_config = {
          'bootstrap.servers': 'localhost:9092',
          'key.serializer': key_serializer,
          'value.serializer': value_serializer
      }

      # Create a SerializingProducer instance with the configuration
      serializing_producer = SerializingProducer(producer_config)
      key = {"timestamp": int(fake.unix_time())}
      serializing_producer.produce(topic='example-stream-avro', value=value, key=key)
      serializing_producer.flush()
      print(f"Message sent successfully")
    except SerializerError as e:
        print(f"Message serialization failed: {str(e)}")
    except Exception as e:
        print(f"Message production failed: {str(e)}")

# Send a message using the first value schema
value1 = {"language": "ENGLISH", "greeting": "Hello, World!"}
send_message(value_serializer_1, value1)

# Send a message using the second value schema
name = fake.name()
value2 = {"name": name, "age": 30}
send_message(value_serializer_2, value2)
© www.soinside.com 2019 - 2024. All rights reserved.