使用Spring Boot @KafkaListener处理批量消息时无法获取自定义@Header

问题描述 投票:0回答:1

我有一个接受一批消息的 Kafka 监听器。 我需要从此侦听器获取自定义标头列表,但它给我一个错误,表明找不到标头。

@KafkaListener(id = KAFKA_LISTENER_ID,
    topics = "${kafka.event-topics.someTopic}",
    properties = {"spring.json.value.default.type=com.id.somegateway.domain.dto.consumerevent.SomeEventConsumedV1"})
  public void consumeMessages(@Payload
                List<SomeEventConsumedV1> messages,
                @Header(KafkaHeaders.RECEIVED_TIMESTAMP) List<Long> timestamps,
                @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                @Header(KafkaHeaders.OFFSET) List<Long> offsets,
                @Header("CUSTOM_HEADER") List<Long> sendTimeHeaders,
                Acknowledgment acknowledgment) {

在此代码中,应用程序无法读取 @Header("CUSTOM_HEADER") 列表 sendTimeHeaders

有一种方法可以通过给方法来解决:

@KafkaListener(id = KAFKA_LISTENER_ID,
    topics = "${kafka.event-topics.someTopic}",
    properties = {"spring.json.value.default.type=com.id.somegateway.domain.dto.consumerevent.SomeEventConsumedV1"})

public void consumeMessages(List<Message<SomeEventConsumedV1>> messages) {

    messages.map(msg -> msg.getHeaders().get("CUSTOM_HEADER"))........

}

但是也许有办法用@Header方法来解决它?

spring-boot spring-kafka
1个回答
0
投票

批处理依赖于

BatchMessagingMessageConverter
。默认情况下,它不执行任何自定义标头映射。

您可以考虑将自定义的标头注入您的

ListenerContainerFactory
中,或者仅从该转换器从
KafkaHeaders.NATIVE_HEADERS
标头中提供的标头中提取标头。

如果您依赖 JSON 转换,则涉及

DefaultKafkaHeaderMapper
,它会将转换后的标头提供为
KafkaHeaders.BATCH_CONVERTED_HEADERS
作为
List<Map<String, Object>>

我建议当您有

@Headers Map<>
参数时进行调试,看看哪一个适合您的场景。

© www.soinside.com 2019 - 2024. All rights reserved.