如何覆盖 avro 命名空间和名称以反序列化到 KafkaAvroDeserializer 中的特定类

问题描述 投票:0回答:1

我正在尝试反序列化 Kafka 中使用 Avro 序列化的记录。不幸的是,生产者(不是我,并且长期以来一直为其他消费者生产)设置了一个名为“record”的模式。当我使用 avro-tool 从这个模式生成一个类时,我得到一个名为“record$”的类,大概是因为 avro 工具试图避免与 java 关键字 record(?) 发生冲突。

KafkaAvroDeserializer 现在无法将这些记录反序列化为 SpecificRecord。起初,因为它找不到要反序列化的类。我现在在我的配置中传递这个类:

      SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 100);
    Map<String, Object> config = new HashMap<>();
    config.put(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
      config.put(SPECIFIC_AVRO_READER_CONFIG, true);
      config.put(AVRO_REFLECTION_ALLOW_NULL_CONFIG, true);
      config.put(AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true);

    if (isKey) {
      config.put(SPECIFIC_AVRO_KEY_TYPE_CONFIG, clazz.getName());
    } else {
      config.put(SPECIFIC_AVRO_VALUE_TYPE_CONFIG, clazz.getName());
    }
      deserializer = new KafkaAvroDeserializer(schemaRegistryClient, config, isKey);

但是,跑步时:

  public T deserialize(byte[] array) throws IOException {
      Object bla = deserializer.deserialize(topic, array);
    return clazz.cast(bla);
  }

我现在明白了

java.lang.ClassCastException: Cannot cast org.apache.avro.generic.GenericData$Record to COMPANY_NAMESPACE.record$

必须有一种方法来处理生产者设置他们为其模式选择的任何有效命名空间和名称,但在他们选择这样的东西的情况下,我无法弄清楚如何反序列化到 SpecificRecord。

目前,我计划重新使用 GenericRecord。

我正在使用版本 7.4.1 的融合库。

我已经单步调试了调试器,但找不到库回退到 GenericRecord 而不是我告诉它使用的类型的时刻。

java apache-kafka deserialization avro
1个回答
0
投票

我相当确定,当反序列化到 SpecificRecord 不起作用时,avro 库不会诉诸 GenericRecord 作为后备,而是抛出一个异常,表明它找不到您要反序列化的类。所以你的 clazz-class 或解串器配置一定有问题。

© www.soinside.com 2019 - 2024. All rights reserved.