Spring-kafka:打印RecordHeaders的正确方法是什么?

问题描述 投票:0回答:1

刚刚检查了日志并发现

ConsumerRecord
标题以绝对不可读的方式打印,原因是
RecordHeader
value
保留为
byte[]
,结果在日志中我看到以下内容:

headers = RecordHeaders(headers = [RecordHeader(key = kafka_exception-fqcn, value = [111, 114, 103, 46, 115, 112, 114, 105, 110, 103, 102, 114, 46, 108, 105, 115) , 116, 101, 110, 101, 114, 46, 84, 1 100, 69, 120, 99, 101, 112, 116, 105, 111, 110]), RecordHeader(key = kafka_exception-message, value = [76 , 105, 115, 116, 101, 110, 101, 114, 32, 102, 97, 105, 108, 101, 100, 59, 32, 105, 115, 32, 111, 114, 115, 112, 114, 105 , 110, 103, 102, 114, 97, 109, 101, 119, 111])

是否可以打印代替

byte[]
人类可读的
String
表示?

spring apache-kafka kafka-consumer-api spring-kafka
1个回答
0
投票

要使用

@KafkaListener
注释在 Spring Kafka 消费者中打印 Kafka 消息头:

@KafkaListener(
    id = "myKafkaListener",
    groupId = "myConsumerGroup",
    topics = "myTopic",
    autoStartup = "true",
    idIsGroup = false,
    containerFactory = "myContainerFactory"
)
public void consume(@Payload String record, @Header(KafkaHeaders.OFFSET) Long offset,
                   @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                   @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                   @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,
                   @Headers Map<String, Object> headers) {
    // Print the payload
    System.out.println("Received message: " + record);

    // Print the individual headers
    System.out.println("Offset: " + offset);
    System.out.println("Partition: " + partition);
    System.out.println("Topic: " + topic);
    System.out.println("Timestamp: " + ts);

    // Print all headers
    headers.forEach((key, value) -> {
        System.out.println("Header - " + key + ": " + value);
    });
}

在此代码中:

  • @KafkaListener
    用于配置监听器。
  • 该方法使用 Kafka 消息,并使用
    @Headers
    参数打印负载、各个标头(例如偏移量、分区、主题和时间戳)以及所有标头。

确保将示例值(例如“myKafkaListener”、“myConsumerGroup”、“myTopic”和“myContainerFactory”)替换为您在应用程序中使用的实际值。

© www.soinside.com 2019 - 2024. All rights reserved.