我想将包含多行的文本作为一条消息推送到kafka主题中。
输入后:
kafka-console-producer --broker-list localhost:9092 --topic myTopic
并复制我的文字:
My Text consists of:
two lines instead of one
我在kafka主题中收到两条消息,但我想只有一条消息。任何想法如何实现?谢谢
您可以使用kafkacat
,使用-D
运算符指定自定义消息分隔符(在此示例中为/
):
kafkacat -b kafka:29092 \
-t test_topic_01 \
-D/ \
-P <<EOF
this is a string message
with a line break/this is
another message with two
line breaks!
EOF
请注意,分隔符必须是单个字节 - 多字节字符最终将包含在生成的消息中See issue #140
产生的消息,也使用kafkacat检查:
$ kafkacat -b kafka:29092 -C \
-f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\n\Partition: %p\tOffset: %o\n--\n' \
-t test_topic_01
Key (-1 bytes):
Value (43 bytes): this is a string message
with a line break
Partition: 0 Offset: 0
--
Key (-1 bytes):
Value (48 bytes): this is
another message with two
line breaks!
Partition: 0 Offset: 1
--
% Reached end of topic test_topic_01 [0] at offset 2
检查使用kafka-console-consumer
:
$ kafka-console-consumer \
--bootstrap-server kafka:29092 \
--topic test_topic_01 \
--from-beginning
this is a string message
with a line break
this is
another message with two
line breaks!
(因此说明为什么kafkacat
比kafka-console-consumer
更好用,因为它可选的详细程度:))
使用kafka-console-producer
是不可能的,因为它使用了以换行符分隔的Java Scanner对象。
您需要通过自己的生产者代码来完成
使用Console-consumer,您显然正在运行来自客户端的预期数据的测试。如果它是单个消息,最好通过添加唯一分隔符作为标识符将其保留为单个字符串。例如
{这是第一行^^这是第二行}
然后在您的消费者工作中相应地处理消息。即使客户计划在消息中发送多个句子,最好在单个字符串中进行,它将改进消息的序列化,并在序列化后更有效。