我们使用org.apache.kafka.clients.producer.ProducerRecord向Kafka发送带有标头的消息
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
this(topic, partition, (Long)null, key, value, headers);
}
如何使用命令实际查看这些标头。 kafka-console-consumer.sh只显示有效负载而没有标题。
您可以使用优秀的kafkacat工具。
示例命令:
kafkacat -b kafka-broker:9092 -t my_topic_name -C \
-f '\nKey (%K bytes): %k
Value (%S bytes): %s
Timestamp: %T
Partition: %p
Offset: %o
Headers: %h\n'
样本输出:
Key (-1 bytes):
Value (13 bytes): {foo:"bar 5"}
Timestamp: 1548350164096
Partition: 0
Offset: 34
Headers: __connect.errors.topic=test_topic_json,__connect.errors.partition=0,__connect.errors.offset=94,__connect.errors.connector.name=file_sink_03,__connect.errors.task.id=0,__connect.errors.stage=VALU
E_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Co
nverting byte[] to Kafka Connect data failed due to serialization error: ,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed
due to serialization error:
kafkacat标头选项仅适用于kafkacat
的最新版本;如果您当前的版本不包含它,您可能希望自己从master分支机构获取build。
来自kafka-console-consumer.sh
脚本:
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
src:https://github.com/apache/kafka/blob/2.1.1/bin/kafka-console-consumer.sh
在kafka.tools.ConsoleConsumer
中,标题提供给Formatter,但现有的Formatters都没有使用它:
formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
msg.timestampType, 0, 0, 0, msg.key, msg.value, msg.headers),
output)
src:https://github.com/apache/kafka/blob/2.1.1/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
在上面链接的底部,您可以看到现有的Formatters。
如果你想打印标题,你需要实现自己的kafka.common.MessageFormatter
,特别是它的write方法:
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit
然后使用--formatter运行你的控制台使用者,提供你自己的格式化程序(它也应该存在于类路径中)。
另一种更简单,更快捷的方法是使用KafkaConsumer实现自己的迷你程序并在调试中检查标头。