解析多个 Avro 记录的字节数组

问题描述 投票:0回答:1

我正在尝试消费一些话题。问题是当生产者序列化数据时,他在一条消息中放入了多个对象。当我反序列化一条消息时,我只得到一个对象而丢失了其他对象。

我的架构是这样的。

` "fields": [
        {
            "name": "instanceuid",
            "type": "string"
        },
        {
            "name": "s",
            "type": "int"
        },
        {
            "name": "t",
            "type": "string"
        }
    ]
`

我的反序列化类来自这样的演示

`public T deserialize(String topic, byte[] data) {
        try {
            T result = null;

            if(data == null) {
                return null;
            }
            LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));
            ByteArrayInputStream in = new ByteArrayInputStream(data);
            DatumReader<GenericRecord> userDatumReader = new SpecificDatumReader<>(targetType.newInstance().getSchema());
            BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
            List<GenericRecord> records = new ArrayList<GenericRecord>();
                 while(true){
                         try {
                                 GenericRecord record = userDatumReader.read(null, decoder);
                                 records.add(record);
                             } catch (EOFException eof) {
                                break;
                            }
                     }

           // result = (T) userDatumReader.read(null, decoder);
            LOGGER.info("deserialized data='{}'", records);
             result = (T) records;
            return result;
        } catch (Exception ex) {
            throw new SerializationException(
                    "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
        } finally {

        }
    }
}`

如果消息只包含一个对象,我的代码就可以工作。

现在我正在尝试获取我的反序列化数据,但是我得到了错误,它是这样的

java.util.ArrayList cannot be cast to org.apache.avro.specific.SpecificRecordBase"

我的类实现了Deserializer,所以反序列化方法是T类型的
那么我怎样才能返回一个带有列表类型的 T 类型呢?

我没有使用 confluent 来注册某些东西的模式。

java apache-kafka deserialization avro
1个回答
0
投票

在一条消息中放置多个对象

就 Kafka 而言,这是完全“有效”的。现在,您需要解析字节...

没有看到数据的实际字节,很难回答为什么会出错,但这里有一些提示。

  1. 如果你有模式,你应该使用 Maven 插件 创建一个类,而不是使用 GenericRecord。
  2. Kafka 通常与 Schema Registry 一起使用,例如 Confluent 提供的那种。 Confluent 提供了自己的
    KafkaAvroDeserializer
    类,但也有其他实现,因此您不必自己编写。更具体地说,BinaryDecoder 不会开箱即用地处理 Confluent 序列化的生产者数据。如前所述,您需要检查原始字节,不能假设数据中只有一条 Avro 记录。
  3. 您的架构没有 Avro 数组类型,因此您已经知道您的解析器正在尝试使用 ArrayLists 做一些事情
  4. 您不能退回
    T
    。您可以返回一个具有 List 字段的类。你会
    implements Deserializer<ClassWithList>
    ,而不是有一个原始界面......
© www.soinside.com 2019 - 2024. All rights reserved.