我有 kafka 主题,我可以在这些主题上以 avro 格式生成消息。 我使用架构注册表。 代码非常基本,如下所示:
// address are changed to localhost.
static {
props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // Assuming MyObject is serialized to a String
props.put(ProducerConfig.CLIENT_ID_CONFIG, "Kafka Avro MyObject Producer");
props.put("schema.registry.url", "http://localhost:8081/");
}
public static void main(String[] args) throws InterruptedException {
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
MyObject myObject = createMyObject(); // Create avro object
final ProducerRecord<String, MyObject> record = new ProducerRecord<>("MyTopic", null, myObject); // null as key
producer.send(record);
producer.flush();
System.out.println("Produced 1 record.");
}
在某些时候,我遇到了 kafka 代理问题,不得不重新启动它。
之后,对于我的一个主题(30 个主题中只有一个),我开始遇到错误:
org.apache.kafka.common.errors.SerializationException:检索 id 0 的 Avro 未知架构时出错
我必须返回分区的偏移量,以确定它可以再次反序列化对象的偏移量,并注意到它与 kafka 停机时间相匹配。
这发生在消费者方面。主要的是kafka connect。但为了测试,我还使用了 kafka-avro-console-consumer,它给了我同样的错误。
在架构注册表上我也有错误,例如:
io.confluence.rest.exceptions.RestNotFoundException:未找到架构 0
还有架构注册表中相应的 http 请求:
“GET /schemas/ids/0?fetchMaxId=false&subject=%3A.%3A HTTP/1.1”404 51
首先,id 0 不是我应该请求的。对于本主题,id 应为 65,并且它确实存在于架构注册表中。
其次,模式注册表中不存在 id 0。
因此,了解发生的情况的另一种尝试是检查实际消息的内容。
我大部分返回了一些不可读的 UTF-8 字节,但我可以识别一些字符,表明消息仍在写入。
出于某种原因,读取的第一个八位字节为 0,我假设消费者希望它是模式 id 来获取它并反序列化,但由于它是 0,所以无法获取任何内容。
我真的很困惑,希望收到想法来了解发生了什么。
当您的解串器(或连接转换器)与串行器不匹配时,此错误很常见。
您的生产者代码使用 StringSerializer 而不是 KafkaAvroSerializer,因此注册表中不会记录任何模式。相反,您的 Avro 对象将被 toString() 处理并以 utf8 形式发送,而不是序列化为 Avro 字节
顺便说一句,当尝试反序列化 avro 数据时,模式 id 是八位字节 1-5