我有一个接受一批消息的 Kafka 监听器。 我需要从此侦听器获取自定义标头列表,但它给我一个错误,表明找不到标头。
@KafkaListener(id = KAFKA_LISTENER_ID,
topics = "${kafka.event-topics.someTopic}",
properties = {"spring.json.value.default.type=com.id.somegateway.domain.dto.consumerevent.SomeEventConsumedV1"})
public void consumeMessages(@Payload
List<SomeEventConsumedV1> messages,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) List<Long> timestamps,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header("CUSTOM_HEADER") List<Long> sendTimeHeaders,
Acknowledgment acknowledgment) {
在此代码中,应用程序无法读取 @Header("CUSTOM_HEADER") 列表 sendTimeHeaders
有一种方法可以通过给方法来解决:
@KafkaListener(id = KAFKA_LISTENER_ID,
topics = "${kafka.event-topics.someTopic}",
properties = {"spring.json.value.default.type=com.id.somegateway.domain.dto.consumerevent.SomeEventConsumedV1"})
public void consumeMessages(List<Message<SomeEventConsumedV1>> messages) {
messages.map(msg -> msg.getHeaders().get("CUSTOM_HEADER"))........
}
但是也许有办法用@Header方法来解决它?
批处理依赖于
BatchMessagingMessageConverter
。默认情况下,它不执行任何自定义标头映射。
您可以考虑将自定义的标头注入您的
ListenerContainerFactory
中,或者仅从该转换器从 KafkaHeaders.NATIVE_HEADERS
标头中提供的标头中提取标头。
如果您依赖 JSON 转换,则涉及
DefaultKafkaHeaderMapper
,它会将转换后的标头提供为 KafkaHeaders.BATCH_CONVERTED_HEADERS
作为 List<Map<String, Object>>
。
我建议当您有
@Headers Map<>
参数时进行调试,看看哪一个适合您的场景。