如何使用嵌入式模式从 Kafka 反序列化 Avro

问题描述 投票:0回答:3

我从 Kafka 主题收到二进制 Avro 文件,我必须反序列化它们。在 Kafka 收到的消息中,我可以在每条消息的开头看到一个模式。我知道最好的做法是不嵌入架构并将其与实际的 Avro 文件分开,但我无法控制生产者,也无法更改这一点。

我的代码运行在 Apache Storm 之上。首先我创建一个阅读器:

mDatumReader = new GenericDatumReader<GenericRecord>();

后来我尝试在不声明模式的情况下反序列化消息:

Decoder decoder = DecoderFactory.get().binaryDecoder(messageBytes, null);
GenericRecord payload = mDatumReader.read(null, decoder);

但是当消息到达时我收到错误:

Caused by: java.lang.NullPointerException: writer cannot be null!
at org.apache.avro.io.ResolvingDecoder.resolve(ResolvingDecoder.java:77) ~[stormjar.jar:?]
at org.apache.avro.io.ResolvingDecoder.<init>(ResolvingDecoder.java:46) ~[stormjar.jar:?]
at org.apache.avro.io.DecoderFactory.resolvingDecoder(DecoderFactory.java:307) ~[stormjar.jar:?]
at org.apache.avro.generic.GenericDatumReader.getResolver(GenericDatumReader.java:122) ~[stormjar.jar:?]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:137) ~[stormjar.jar:?]

我看到的所有答案都是关于使用其他格式、更改传递到 Kafka 的消息或其他内容。我无法控制这些事情。

我的问题是,给定

bytes[]
中的消息,并在二进制消息中嵌入架构,如何在不声明架构的情况下反序列化该 Avro 文件,以便我可以读取它。

apache-kafka apache-storm avro
3个回答
2
投票

使用 DatumReader/Writer,不存在嵌入式架构之类的东西。当我第一次看到 Avro 和 Kafka 时,这也是我的误解。但 Avro Serializer 的源代码清楚地表明使用 GenericDatumWriter 时没有嵌入模式。

DataFileWriter 在文件开头写入架构,然后使用 GenericDatumWriter 添加 GenericRecords。

既然您一开始就说有一个模式,我假设您可以读取它,将其转换为 Schema 对象,然后将其传递到 GenericDatumReader(schema) 构造函数中。 了解消息是如何序列化的会很有趣。也许 DataFileWriter 用于写入 byte[] 而不是实际文件,那么您可以使用 DataFileReader 来反序列化数据吗?


0
投票
  1. 添加Maven依赖

    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>1.9.1</version>
        <type>maven-plugin</type>
    </dependency>
    
  2. 创建如下文件

     {"namespace": "tachyonis.space",
       "type": "record",
       "name": "Avro",
       "fields": [
          {"name": "Id", "type": "string"},
        ]
      }
    
  3. 将上面的内容另存为 Avro.avsc 到 src/main/resources 中。

  4. 在 Eclipse 或任何 IDE 中运行 > Maven 生成源代码,这些源文件创建 Avro.java 到打包文件夹 [namespace] tachyonis.space

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 
    KafkaConsumer<String, Avro> consumer = new KafkaConsumer<>(props);
    
  5. 消费者/生产者必须在同一台机器上运行。否则,您需要在 Windows/Linux 中配置主机文件,并将所有组件配置属性从 localhost 更改为映射到实际 IP 地址,以便广播给生产者/消费者。否则您会收到诸如网络连接问题之类的错误

    Connection to node -3 (/127.0.0.1:9092) could not be established. Broker may not be available
    

0
投票

我在使用 Spring Boot 时遇到了同样的问题。

不同的系统发送带有嵌入模式的消息作为有效负载的一部分:

Objavro.schema#{"type":"record","name":"CONTACTS","namespace":"my.avros","fields":[{"name":"EXTERNAL_CONTACT_ID","type":"string"},{"name":"EXTERNAL_SYSTEM","type":"string"},{"name":"CONTACT_ID","type":["null","int"]},{"name":"CREATE_TIMESTAMP","type":"string"},{"name":"UPDATE_TIMESTAMP","type":"string"}],"version":"1"}####F#[####)Ml1068GSIS##&2020-06-26T08:33:28&2021-06-25T10:47:43####F# [####)M

这就是我如何通过将字节数组包装到 FileReader 中来反序列化数据:

首先:我们跨系统共享 AVRO 架构:

{ 
  "type": "record",
  "namespace": "my.avro",
  "name": "CONTACTS",
  "version": "1",
  "fields": 
  [
    {
      "name": "EXTERNAL_CONTACT_ID",
      "type": "string"
    },
    {
      "name": "EXTERNAL_SYSTEM",
      "type": "string"
    },
    {
      "name": "CONTACT_ID",
      "type": ["null", "int"]
    },
    {
      "name": "CREATE_TIMESTAMP",
      "type": "string"
    },
    {
      "name": "UPDATE_TIMESTAMP",
      "type": "string"
    }
  ]
}

然后我们通过 avro-maven-plugin 生成源代码

<plugin>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-maven-plugin</artifactId>
    <version>1.11.3</version>
    <executions>
        <execution>
            <phase>generate-sources</phase>
            <goals>
                <goal>schema</goal>
            </goals>
            <configuration>
                <sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
                <outputDirectory>${project.basedir}/target/generated-sources/avro</outputDirectory>
            </configuration>
        </execution>
    </executions>
</plugin>

然后,在我们的 KafkaListener 中,我们将有效负载包装到

DataFileReader
:

import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;

//...

  public void deSerializeAvroBinary(byte[] data) throws IOException {
    DatumReader<CONTACTS> datumReader =
        new SpecificDatumReader<>(CONTACTS.getClassSchema());

    CONTACTS output;

    try (DataFileReader<CONTACTS> dataFileReader = new DataFileReader(
        new SeekableByteArrayInput(data), datumReader)) {
      while (dataFileReader.hasNext()) {
        output = dataFileReader.next();
        // do whatever you want with the data
        log.info(output.toString());
      }
    }
  }

旁注:

在我们的应用程序属性中,我们通过以下方式配置 Kafka 消费者:

  kafka:
    bootstrap-servers: localhost:9093
    security:
      protocol: "SSL"
    consumer:
      group-id: "my_testgroupid"
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      ssl:
        key-store-location: file:/my-consumer-cert.p12
        key-store-password: *************
        trust-store-location: file:/truststore.p12
        trust-store-password: *************
© www.soinside.com 2019 - 2024. All rights reserved.