刚刚检查了日志并发现
ConsumerRecord
标题以绝对不可读的方式打印,原因是 RecordHeader
将 value
保留为 byte[]
,结果在日志中我看到以下内容:
headers = RecordHeaders(headers = [RecordHeader(key = kafka_exception-fqcn, value = [111, 114, 103, 46, 115, 112, 114, 105, 110, 103, 102, 114, 46, 108, 105, 115) , 116, 101, 110, 101, 114, 46, 84, 1 100, 69, 120, 99, 101, 112, 116, 105, 111, 110]), RecordHeader(key = kafka_exception-message, value = [76 , 105, 115, 116, 101, 110, 101, 114, 32, 102, 97, 105, 108, 101, 100, 59, 32, 105, 115, 32, 111, 114, 115, 112, 114, 105 , 110, 103, 102, 114, 97, 109, 101, 119, 111])
是否可以打印代替
byte[]
人类可读的 String
表示?
要使用
@KafkaListener
注释在 Spring Kafka 消费者中打印 Kafka 消息头:
@KafkaListener(
id = "myKafkaListener",
groupId = "myConsumerGroup",
topics = "myTopic",
autoStartup = "true",
idIsGroup = false,
containerFactory = "myContainerFactory"
)
public void consume(@Payload String record, @Header(KafkaHeaders.OFFSET) Long offset,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,
@Headers Map<String, Object> headers) {
// Print the payload
System.out.println("Received message: " + record);
// Print the individual headers
System.out.println("Offset: " + offset);
System.out.println("Partition: " + partition);
System.out.println("Topic: " + topic);
System.out.println("Timestamp: " + ts);
// Print all headers
headers.forEach((key, value) -> {
System.out.println("Header - " + key + ": " + value);
});
}
在此代码中:
@KafkaListener
用于配置监听器。@Headers
参数打印负载、各个标头(例如偏移量、分区、主题和时间戳)以及所有标头。确保将示例值(例如“myKafkaListener”、“myConsumerGroup”、“myTopic”和“myContainerFactory”)替换为您在应用程序中使用的实际值。