我正在尝试使用 Python 脚本接收来自该主题的消息。
from confluent_kafka import Consumer, KafkaError
import uuid
# Kafka broker details
broker = "something"
topic = "something"
group = str(uuid.uuid4())
# Kafka consumer configuration
conf = {
"bootstrap.servers": broker,
"group.id": group,
"auto.offset.reset": "earliest"
}
consumer = Consumer(conf)
# Subscribe to the topic
consumer.subscribe([topic])
try:
while True:
msg = consumer.poll(1.0) # Poll for new messages
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
print("Reached end of partition")
else:
print("Error: {}".format(msg.error().str()))
else:
# Decode the received message value as UTF-8
received_message = msg.value().decode("utf-8", errors="ignore")
# Print the received message
print("Received message:", received_message)
except KeyboardInterrupt:
pass
finally:
consumer.close()
所以 是 kafka ui,这样你就可以看到密钥。我得到的是 H 而不是密钥名称。
这是我的输出:
收到消息: H2ff4c4d5-74ee-4708-b553-aa68ea5e675bHdfb821d1-db91-44b8-a080-8f689a80bb32H2ff4c4d5-74ee-4708-b553-aa68ea5e675bCREATEIntegrationCATALOGNode_[{"id":"2 ff4c4d5-74ee-4708-b553-aa68ea5e675b","name":"集成" ,"classType":"EntityClass","graphType":"Node","objectType":"集成","enabled":true,"tenantDefault":true.....
这是我下载时的kafka消息(看看如何有标题而不是H):
“ {” ID“:” 100DEA7F-BC9D-4775-B660-17B9999FD1439A“,“ tenant_id”:“ D257587B-D53F-498A-9256-9256-49256-4D1851B81B847D8”, -b660-17b99fd1439a"},"操作":{"string":"CREATE"},"type":{"string":"Genericentity"},"base_type":{"string":"CATALOG"}," graph_type":{"string":"Node"},"json_payload":{"string":"[{\"id\":\"100dea7f-bc9d-4775-b660-17b99fd1439a\",\"name\" :\"LSH - 安全库存\"、\"classType\":\"EntityClass\"、\"graphType\":\"Node\"、\"objectType\":\"LSHSafetyStock\"、\"enabled\ ":true,\"tenantDefault\":false,\"catalogType\":\"genericentity\",\"type\":\"Genericentity\",\"cdmFields\":[{\"id\": \“2e1f4381-cf61-49a5-934b-6b47a93d1771 \”,\“名称\”:\“baseTypeKey \”,\“displayLabel \”:\“基本类型密钥\”,\“类型\”:\“STRING \ ",\"必需\":false,\"大小\":20,\"单位\":\"默认\",\"值\":\"\",\"正则表达式\":null,\ “唯一\”:假,\“默认值\”:空,\“组\”:\“系统\”,\“自动生成\”:假,\“格式\”:\“\”,\“工具提示\ ":\"\"、\"可搜索\":false、\"可过滤\":false、\"可编辑\":false、\"entityName\":null、\"entityKey\":null、\"entityValue \":null,\"可显示\":false,\"instanceUserEditable\":false,\"instanceUserCreatable\":false,\"apiFlag\":false,\"apiUrl\":\"\",\" apiResponseSelectKey\":\"\",\"apiResponseSelectValue\":\"\",\"order\":-1,\"tenantDefault\":false,\"isDisplayableOnSummary\":false,\"isDisplayableOnDetails\" :false,\"isDisplayableOnCatalog\":false,\"isDisplayableOnList\":false,\"dataClassification\":\"默认\"},{\"id\":\"cf7a08cb-5f7f-472b-a9eb-516ca864a188 \",\"名称\":\"entityType\",\"displayLabel\":\"实体类型\",\"类型\":\"STRING\",\"必填\":false,\"大小\":20、\"单位\":\"默认\"、\"值\":\"\"、\"正则表达式\":null、\"唯一\":false、\"defaultValue\" :null,\"组\":\"系统\",\"自动生成\":false,\"格式\":\"\",\"工具提示\":\"\",\"可搜索\" :假,\“可过滤\”:假,\“可编辑\”:假,\“entityName \”:空,\“entityKey \”:空,\“entityValue \”:空,\“可显示\”:假,\"实例用户E
该主题的数据似乎以与您预期不同的格式编写。您使用模式注册表吗?也许它是用 Confluence Wire 格式编写的。如果 Kafka UI 设置正确,它会使用模式注册表中的模式自动解码消息。
Kafka UI 中选择了哪个值的 serde?如果它类似于“模式注册表”,那么这里有一些关于如何将模式注册表与 python 客户端集成的资源 https://developer.confluence.io/courses/kafka-python/integrate-with-schema-registry/