我正在尝试消费一些话题。问题是当生产者序列化数据时,他在一条消息中放入了多个对象。当我反序列化一条消息时,我只得到一个对象而丢失了其他对象。
我的架构是这样的。
` "fields": [
{
"name": "instanceuid",
"type": "string"
},
{
"name": "s",
"type": "int"
},
{
"name": "t",
"type": "string"
}
]
`
我的反序列化类来自这样的演示
`public T deserialize(String topic, byte[] data) {
try {
T result = null;
if(data == null) {
return null;
}
LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));
ByteArrayInputStream in = new ByteArrayInputStream(data);
DatumReader<GenericRecord> userDatumReader = new SpecificDatumReader<>(targetType.newInstance().getSchema());
BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
List<GenericRecord> records = new ArrayList<GenericRecord>();
while(true){
try {
GenericRecord record = userDatumReader.read(null, decoder);
records.add(record);
} catch (EOFException eof) {
break;
}
}
// result = (T) userDatumReader.read(null, decoder);
LOGGER.info("deserialized data='{}'", records);
result = (T) records;
return result;
} catch (Exception ex) {
throw new SerializationException(
"Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
} finally {
}
}
}`
如果消息只包含一个对象,我的代码就可以工作。
现在我正在尝试获取我的反序列化数据,但是我得到了错误,它是这样的
java.util.ArrayList cannot be cast to org.apache.avro.specific.SpecificRecordBase"
我的类实现了Deserializer,所以反序列化方法是T类型的
那么我怎样才能返回一个带有列表类型的 T 类型呢?
我没有使用 confluent 来注册某些东西的模式。
在一条消息中放置多个对象
就 Kafka 而言,这是完全“有效”的。现在,您需要解析字节...
没有看到数据的实际字节,很难回答为什么会出错,但这里有一些提示。
KafkaAvroDeserializer
类,但也有其他实现,因此您不必自己编写。更具体地说,BinaryDecoder 不会开箱即用地处理 Confluent 序列化的生产者数据。如前所述,您需要检查原始字节,不能假设数据中只有一条 Avro 记录。T
。您可以返回一个具有 List 字段的类。你会implements Deserializer<ClassWithList>
,而不是有一个原始界面......