如何使用confluent-kafka-python确定kafka主题是否存在

问题描述 投票:2回答:1

我正在使用confluent-kafka-python软件包与Kafka服务器进行接口。我可以成功创建主题并将事件推送到该主题。但是,我的问题出在我启动多个节点(在Docker中运行)时,如果另一个实例也尝试创建该主题,则会出现错误。在创建新主题之前,我需要首先检查该主题是否已经存在。

from confluent_kafka.admin import AdminClient, NewTopic
kafka_admin = AdminClient({"bootstrap.servers": server})

# First check here if the topic already exists!
if not topic_exists(topic):  # <-- how to accomplish this?
    new_kafka_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
    results = kafka_admin.create_topics([new_kafka_topic])

感谢您的帮助!

python apache-kafka confluent confluent-kafka-python
1个回答
0
投票

我遇到了同样的问题,并通过以下方式进行了处理:

client = AdminClient({"bootstrap.servers": BROKER_URL})
topic_metadata = client.list_topics()
if topic_metadata.topics.get(self.topic_name) is None:
  self.create_topic()
© www.soinside.com 2019 - 2024. All rights reserved.