Kafka 类型标头未被生产者/消费者删除

问题描述 投票:0回答:1

应用程序.属性

spring.cloud.function.definition=test
spring.kafka.bootstrap-servers=server-name-comes-here
spring.cloud.stream.kafka.binder.producerProperties.spring.json.add.type.headers=false
spring.cloud.stream.kafka.binder.consumerProperties.spring.json.use.type.headers=false
spring.cloud.stream.bindings.test-in-0.destination=${input.topic.name}
spring.cloud.stream.bindings.test-in-0.group=group

spring.cloud.stream.bindings.test-out-vne.destination=${output.topic.name}
spring.cloud.stream.bindings.test-out-vne.destination.transacted=true

输出标头:

[
  {
    "key": "jmsId",
    "stringValue": "1234"
  },
  {
    "key": "target-protocol",
    "stringValue": "kafka"
  },
  {
    "key": "UUID",
    "stringValue": "12334"
  },
  {
    "key": "contentType",
    "stringValue": "application/json"
  },
  {
    "key": "spring_json_header_types",
    "stringValue": "{\"jmsId\":\"java.lang.String\",\"UUID\":\"java.lang.String\",\"contentType\":\"java.lang.String\",\"target-protocol\":\"java.lang.String\"}"
  }
]

我尝试在 Producer 中使用 ADD_TYPE_INFO_HEADERS - false ,在 comsumer 中使用 USE_TYPE_INFO_HEADERS-false ,但输出标头仍然带有类型信息, 请建议如何获取标题,如下所示,

{
"jmsId": "1234",
"UUID": "12334"
}

我在 Producer 配置中使用 ByteArrayJsonConverter,有效负载工作正常,但标头具有附加类型信息,需要删除类型信息并需要如下格式

期望输出:

{
"jmsId": "1234",
"UUID": "12334"
}
apache-kafka spring-kafka spring-cloud-stream-binder-kafka spring-cloud-stream-binder
1个回答
0
投票

如果您在 Spring Cloud Stream 应用程序中使用该主题(来自出站主题),您将无法看到这些标头,除非您添加自定义标头映射器。尽管您将

add_type_header
设置为
false
,但标题仍会以
spring_json_header_types
形式写入主题。该标志用于 JSON 序列化器添加
TYPE_ID
标头。如果您不希望
spring_json_header_types
不作为 Kafka 主题记录的一部分出去,您需要在应用程序中提供一个自定义标头映射器,如下所示。

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
  BinderHeaderMapper headerMapper = new BinderHeaderMapper();
  headerMapper.setMapAllStringsOut(true);
  return headerMapper;
}

请注意,我们将

mapAllStringsOut
属性设置为 true,以便我们映射任何字符串值,并且由于不会出现任何值,因此
spring_json_header_types
标头将不会发送到 Kafka 主题。

© www.soinside.com 2019 - 2024. All rights reserved.