我在我的Python项目中使用confluence_kafka==2.2.0。我想为一个主题设置多个模式。我正在阅读 AvroSerializer 的文档,它说它有设置 RecordNameStrategy 的配置选项(请参阅此处的文档https://docs.confluence.io/platform/current/clients/confluence-kafka-python/html/index.html #avroserializer)。此外,在该库的版本中,它表示它开始支持非默认模式策略(请参见此处https://github.com/confluenceinc/confluence-kafka-python/releases/tag/v1.4.0)。然而,由于两个原因,我坚持实际这样做:
import asyncio
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import SerializingProducer, DeserializingConsumer
from confluent_kafka.serialization import StringSerializer, StringDeserializer
from confluent_kafka.schema_registry import record_subject_name_strategy, SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from config import Config
# Utility functions
def create_admin(config: dict):
return AdminClient(config)
async def create_new_topic(admin: AdminClient, topic_name: str):
if topic_name in admin.list_topics().topics:
print("Already exist!")
return
futures = admin.create_topics([
NewTopic(topic_name, num_partitions=1, replication_factor=1),
])
await asyncio.wrap_future(futures[topic_name])
print("Topic created!")
# Main function
async def main():
admin = create_admin(Config.KAFKA)
await create_new_topic(admin, "test_multischema")
schema_registry_client = SchemaRegistryClient(Config.SCHEMA_REGISTRY)
# Reading schema strings
with open("event1.avsc", "r") as f:
schema_1_str = f.read()
with open("event2.avsc", "r") as f:
schema_2_str = f.read()
# Here is the problem
producer_config = Config.KAFKA | {
"key.serializer": StringSerializer(),
"value.serializer": AvroSerializer(
schema_registry_client,
# This serializer should be able to deserialize messages of both schema_1 and schema_2.
# How should I write config to pass both to one producer?
schema_1_str,
conf={
# What args should I pass?
"subject.name.strategy": record_subject_name_strategy()
}
)
}
producer = SerializingProducer(producer_config)
# After I will produce msgs and then consume them
也许有人可以提供如何配置 SerializingProducer 和 DeserializingConsumer 的代码片段,以便他们能够使用这两种模式读取主题?
尝试在文档中寻找解决方案,但它要么声明尚不支持(旧的),要么仅声明策略,并且没有提供有关其在代码中实际外观的示例(就像这里如何更改SubjectNameStrategy和在 confluence-kafka-python 的 AvroProducer 中使用不同的模式?)。
没有提供关于它在代码中实际应该是什么样子的示例(就像这里
感谢您找到我的其他答案。文档指出这里。但这个文件不包含策略参数,不。
https://github.com/confluenceinc/confluence-kafka-python/blob/master/examples/avro_ Producer.py
可调用只是一个函数
serializer = AvroSerializer(conf={
'subject.name.strategy': lambda (x): return x
})