Magic v1 不支持记录头?

问题描述 投票:0回答:2

我使用 spring-boot (2.2.4)、spring-kafka (2.3.5)、Gradle 和 Kotlin。 我想使用标头来映射类型,但我不明白该怎么做...

我查看spring-kafka参考并编写下一个代码:

我的配置:

@Configuration
class KafkaProducerConfig {

fun defaultConfig(): HashMap<String, Any> {
    var config = HashMap<String, Any>()
    config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
    config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = IntegerSerializer::class.java
    config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
    config[JsonSerializer.TYPE_MAPPINGS] = "order:${Order::class.java}, client:${Client::class.java}"

    return config
}

@Bean
fun orderTemplate(): KafkaTemplate<Int, Order> {
    val factory = DefaultKafkaProducerFactory<Int, Order>(defaultConfig(),
            IntegerSerializer(),
            JsonSerializer<Order>())
    return KafkaTemplate(factory)
}
}

我发送数据:

@Service
class ProducerService(@Autowired val orderSender: KafkaTemplate<Int, Order> { 

@Scheduled(fixedDelay = 2000)
fun send() {
    orderSender.send("order.t", 1, Order())
}

我收到以下错误:

Magic v1 不支持记录头

请帮助我。

spring-kafka
2个回答
0
投票

这意味着你的 Kafka Broker 太旧了(< 0.11.0.0) to support headers.

JSON 序列化器(默认情况下)添加类型信息来记录标头,以便接收系统有一些关于如何反序列化的提示。

如果您必须使用这样的旧代理,您可以使用

关闭发送标头
/**
 * Set to false to disable adding type info headers.
 * @param addTypeInfo true to add headers.
 * @since 2.1
 */
public void setAddTypeInfo(boolean addTypeInfo) {
    this.addTypeInfo = addTypeInfo;
}

在序列化器上。


0
投票

你需要使用

kafka 消息格式版本为 0.11.0-IV2,这在 PRODUCER 和 CONSUMER 主题上也是如此,并且不会再次观察到。

© www.soinside.com 2019 - 2024. All rights reserved.