如何查看kafka标题

问题描述 投票:2回答:2

我们使用org.apache.kafka.clients.producer.ProducerRecord向Kafka发送带有标头的消息

public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
    this(topic, partition, (Long)null, key, value, headers);
}

如何使用命令实际查看这些标头。 kafka-console-consumer.sh只显示有效负载而没有标题。

apache-kafka kafka-producer-api
2个回答
8
投票

您可以使用优秀的kafkacat工具。

示例命令:

kafkacat -b kafka-broker:9092 -t my_topic_name -C \
  -f '\nKey (%K bytes): %k
  Value (%S bytes): %s
  Timestamp: %T
  Partition: %p
  Offset: %o
  Headers: %h\n'

样本输出:

Key (-1 bytes):
  Value (13 bytes): {foo:"bar 5"}
  Timestamp: 1548350164096
  Partition: 0
  Offset: 34
  Headers: __connect.errors.topic=test_topic_json,__connect.errors.partition=0,__connect.errors.offset=94,__connect.errors.connector.name=file_sink_03,__connect.errors.task.id=0,__connect.errors.stage=VALU
E_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Co
nverting byte[] to Kafka Connect data failed due to serialization error: ,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed
 due to serialization error:

kafkacat标头选项仅适用于kafkacat的最新版本;如果您当前的版本不包含它,您可能希望自己从master分支机构获取build


2
投票

来自kafka-console-consumer.sh脚本:

exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

src:https://github.com/apache/kafka/blob/2.1.1/bin/kafka-console-consumer.sh

kafka.tools.ConsoleConsumer中,标题提供给Formatter,但现有的Formatters都没有使用它:

formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
                                     msg.timestampType, 0, 0, 0, msg.key, msg.value, msg.headers),
                                     output)

src:https://github.com/apache/kafka/blob/2.1.1/core/src/main/scala/kafka/tools/ConsoleConsumer.scala

在上面链接的底部,您可以看到现有的Formatters。

如果你想打印标题,你需要实现自己的kafka.common.MessageFormatter,特别是它的write方法:

def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit

然后使用--formatter运行你的控制台使用者,提供你自己的格式化程序(它也应该存在于类路径中)。

另一种更简单,更快捷的方法是使用KafkaConsumer实现自己的迷你程序并在调试中检查标头。

© www.soinside.com 2019 - 2024. All rights reserved.