如何使用confluence_kafka python客户端为主题设置多个模式?

问题描述 投票:0回答:1

我在我的Python项目中使用confluence_kafka==2.2.0。我想为一个主题设置多个模式。我正在阅读 AvroSerializer 的文档,它说它有设置 RecordNameStrategy 的配置选项(请参阅此处的文档https://docs.confluence.io/platform/current/clients/confluence-kafka-python/html/index.html #avroserializer)。此外,在该库的版本中,它表示它开始支持非默认模式策略(请参见此处https://github.com/confluenceinc/confluence-kafka-python/releases/tag/v1.4.0)。然而,由于两个原因,我坚持实际这样做:

  1. 我有几个带有架构的 .avsc 文件。如何向 AvroSerializer 提供多个架构?它仅接收 1 个模式参数。
  2. confluence_kafka.schema_registry.record_subject_name_strategy 接收 2 个参数:ctx 和 record_name,我应该在那里传递什么? 我的代码如下所示:
import asyncio

from confluent_kafka.admin import AdminClient, NewTopic

from confluent_kafka import SerializingProducer, DeserializingConsumer
from confluent_kafka.serialization import StringSerializer, StringDeserializer
from confluent_kafka.schema_registry import record_subject_name_strategy, SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer

from config import Config


# Utility functions
def create_admin(config: dict):
    return AdminClient(config)


async def create_new_topic(admin: AdminClient, topic_name: str):
    if topic_name in admin.list_topics().topics:
        print("Already exist!")
        return

    futures = admin.create_topics([
        NewTopic(topic_name, num_partitions=1, replication_factor=1),
    ])

    await asyncio.wrap_future(futures[topic_name])
    print("Topic created!")


# Main function
async def main():
    admin = create_admin(Config.KAFKA)
    await create_new_topic(admin, "test_multischema")

    schema_registry_client = SchemaRegistryClient(Config.SCHEMA_REGISTRY)
    
    # Reading schema strings
    with open("event1.avsc", "r") as f:
        schema_1_str = f.read()
    with open("event2.avsc", "r") as f:
        schema_2_str = f.read()

    # Here is the problem
    producer_config = Config.KAFKA | {
        "key.serializer": StringSerializer(),
        "value.serializer": AvroSerializer(
            schema_registry_client,
            # This serializer should be able to deserialize messages of both schema_1 and schema_2.
            # How should I write config to pass both to one producer?
            schema_1_str,
            conf={
                # What args should I pass?
                "subject.name.strategy": record_subject_name_strategy()
            }
        )
    }
    producer = SerializingProducer(producer_config)
    # After I will produce msgs and then consume them

也许有人可以提供如何配置 SerializingProducer 和 DeserializingConsumer 的代码片段,以便他们能够使用这两种模式读取主题?

尝试在文档中寻找解决方案,但它要么声明尚不支持(旧的),要么仅声明策略,并且没有提供有关其在代码中实际外观的示例(就像这里如何更改SubjectNameStrategy和在 confluence-kafka-python 的 AvroProducer 中使用不同的模式?)。

python apache-kafka avro confluent-schema-registry confluent-kafka-python
1个回答
0
投票

没有提供关于它在代码中实际应该是什么样子的示例(就像这里

感谢您找到我的其他答案。文档指出这里。但这个文件不包含策略参数,不。

https://github.com/confluenceinc/confluence-kafka-python/blob/master/examples/avro_ Producer.py

可调用只是一个函数

https://github.com/confluenceinc/confluence-kafka-python/blob/master/src/confluence_kafka/schema_registry/avro.py#L223

serializer = AvroSerializer(conf={
  'subject.name.strategy': lambda (x): return x
})
© www.soinside.com 2019 - 2024. All rights reserved.