我没有收到 Kafka 内容中的密钥 - Python/confluence

问题描述 投票:0回答:1

我正在尝试使用 Python 脚本接收来自该主题的消息。

from confluent_kafka import Consumer, KafkaError
import uuid

# Kafka broker details
broker = "something"
topic = "something"
group = str(uuid.uuid4())

# Kafka consumer configuration
conf = {
    "bootstrap.servers": broker,
    "group.id": group,
    "auto.offset.reset": "earliest"  
}

consumer = Consumer(conf)

# Subscribe to the topic
consumer.subscribe([topic])

try:
    while True:
        msg = consumer.poll(1.0)  # Poll for new messages
        
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                print("Reached end of partition")
            else:
                print("Error: {}".format(msg.error().str()))
        else:
            # Decode the received message value as UTF-8
            received_message = msg.value().decode("utf-8", errors="ignore")

            # Print the received message
            print("Received message:", received_message)
except KeyboardInterrupt:
    pass
finally:
    consumer.close()

所以 是 kafka ui,这样你就可以看到密钥。我得到的是 H 而不是密钥名称。

这是我的输出:

收到消息: H2ff4c4d5-74ee-4708-b553-aa68ea5e675bHdfb821d1-db91-44b8-a080-8f689a80bb32H2ff4c4d5-74ee-4708-b553-aa68ea5e675bCREATEIntegrationCATALOGNode_[{"id":"2 ff4c4d5-74ee-4708-b553-aa68ea5e675b","name":"集成" ,"classType":"EntityClass","graphType":"Node","objectType":"集成","enabled":true,"tenantDefault":true.....

这是我下载时的kafka消息(看看如何有标题而不是H):

“ {” ID“:” 100DEA7F-BC9D-4775-B660-17B9999FD1439A“,“ tenant_id”:“ D257587B-D53F-498A-9256-9256-49256-4D1851B81B847D8”, -b660-17b99fd1439a"},"操作":{"string":"CREATE"},"type":{"string":"Genericentity"},"base_type":{"string":"CATALOG"}," graph_type":{"string":"Node"},"json_payload":{"string":"[{\"id\":\"100dea7f-bc9d-4775-b660-17b99fd1439a\",\"name\" :\"LSH - 安全库存\"、\"classType\":\"EntityClass\"、\"graphType\":\"Node\"、\"objectType\":\"LSHSafetyStock\"、\"enabled\ ":true,\"tenantDefault\":false,\"catalogType\":\"genericentity\",\"type\":\"Genericentity\",\"cdmFields\":[{\"id\": \“2e1f4381-cf61-49a5-934b-6b47a93d1771 \”,\“名称\”:\“baseTypeKey \”,\“displayLabel \”:\“基本类型密钥\”,\“类型\”:\“STRING \ ",\"必需\":false,\"大小\":20,\"单位\":\"默认\",\"值\":\"\",\"正则表达式\":null,\ “唯一\”:假,\“默认值\”:空,\“组\”:\“系统\”,\“自动生成\”:假,\“格式\”:\“\”,\“工具提示\ ":\"\"、\"可搜索\":false、\"可过滤\":false、\"可编辑\":false、\"entityName\":null、\"entityKey\":null、\"entityValue \":null,\"可显示\":false,\"instanceUserEditable\":false,\"instanceUserCreatable\":false,\"apiFlag\":false,\"apiUrl\":\"\",\" apiResponseSelectKey\":\"\",\"apiResponseSelectValue\":\"\",\"order\":-1,\"tenantDefault\":false,\"isDisplayableOnSummary\":false,\"isDisplayableOnDetails\" :false,\"isDisplayableOnCatalog\":false,\"isDisplayableOnList\":false,\"dataClassification\":\"默认\"},{\"id\":\"cf7a08cb-5f7f-472b-a9eb-516ca864a188 \",\"名称\":\"entityType\",\"displayLabel\":\"实体类型\",\"类型\":\"STRING\",\"必填\":false,\"大小\":20、\"单位\":\"默认\"、\"值\":\"\"、\"正则表达式\":null、\"唯一\":false、\"defaultValue\" :null,\"组\":\"系统\",\"自动生成\":false,\"格式\":\"\",\"工具提示\":\"\",\"可搜索\" :假,\“可过滤\”:假,\“可编辑\”:假,\“entityName \”:空,\“entityKey \”:空,\“entityValue \”:空,\“可显示\”:假,\"实例用户E

python apache-kafka kafka-consumer-api confluent-kafka-python data-engineering
1个回答
0
投票

该主题的数据似乎以与您预期不同的格式编写。您使用模式注册表吗?也许它是用 Confluence Wire 格式编写的。如果 Kafka UI 设置正确,它会使用模式注册表中的模式自动解码消息。

Kafka UI 中选择了哪个值的 serde?如果它类似于“模式注册表”,那么这里有一些关于如何将模式注册表与 python 客户端集成的资源 https://developer.confluence.io/courses/kafka-python/integrate-with-schema-registry/

© www.soinside.com 2019 - 2024. All rights reserved.