显示无效字符,同时使用卡夫卡控制台消费者消费

问题描述 投票:0回答:2

在使用卡夫卡控制台消费者或KT(GoLang CLI工具,卡夫卡)从卡夫卡的话题费时,我越来越无效字符。

...
\u0000\ufffd?\u0006app\u0000\u0000\u0000\u0000\u0000\u0000\u003e@\u0001
\u0000\u000cSec-39\u001aSome Actual Value Text\ufffd\ufffd\ufffd\ufffd\ufffd
\ufffd\u0015@\ufffd\ufffd\ufffd\ufffd\ufffd\ufff
...

尽管卡夫卡连接实际上可以正确的数据接收到一个SQL数据库。

apache-kafka kafka-consumer-api confluent
2个回答
2
投票

既然你说

卡夫卡连接实际上可以正确的数据接收到一个SQL数据库。

我的假设是,你正在使用的Avro序列化有关该主题的数据。卡夫卡连接正确配置将采取的Avro数据和deserialise它。

然而,控制台工具,如kafka-console-consumerktkafkacat等不支持的Avro,所以你会得到一堆奇怪的字符,如果你使用它们从一个主题,是Avro的编码的读取数据。

要阅读Avro的数据,你可以使用kafka-avro-console-consumer命令行:

kafka-avro-console-consumer
         --bootstrap-server kafka:29092\
         --topic test_topic_avro \
         --property schema.registry.url=http://schema-registry:8081

编辑:添加从@CodeGeas建议过:

另外,阅读使用REST Proxy数据可以通过以下来完成:

# Create a consumer for JSON data
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
         -H "Accept: application/vnd.kafka.v2+json" \
         --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \

# Subscribe the consumer to a topic
         http://kafka-rest-instance:8082/consumers/my_json_consumer
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
         --data '{"topics":["YOUR-TOPIC-NAME"]}' \
         http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription

# Then consume some data from a topic using the base URL in the first response.
curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
         http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance/records

后来,事后删除消费者:

curl -X DELETE -H "Accept: application/vnd.kafka.avro.v2+json" \
         http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance

1
投票

默认情况下,控制台工具,消费者既反序列化消息键和值使用ByteArrayDeserializer但随后明显尝试将数据打印到使用默认格式化的命令行。

然而,这个工具允许自定义使用的反序列化和格式化。请参阅从帮助输出以下摘录:

--formatter <String: class>              The name of a class to use for
                                           formatting kafka messages for
                                           display. (default: kafka.tools.
                                           DefaultMessageFormatter)
--property <String: prop>                The properties to initialize the
                                           message formatter. Default
                                           properties include:
                                            print.timestamp=true|false
                                            print.key=true|false
                                            print.value=true|false
                                            key.separator=<key.separator>
                                            line.separator=<line.separator>
                                            key.deserializer=<key.deserializer>
                                            value.deserializer=<value.
                                           deserializer>
                                         Users can also pass in customized
                                           properties for their formatter; more
                                           specifically, users can pass in
                                           properties keyed with 'key.
                                           deserializer.' and 'value.
                                           deserializer.' prefixes to configure
                                           their deserializers.
--key-deserializer <String:
  deserializer for key>
--value-deserializer <String:
  deserializer for values>

使用这些设置,你应该能够改变输出是你想要的。

© www.soinside.com 2019 - 2024. All rights reserved.