我从 Kafka 主题收到二进制 Avro 文件,我必须反序列化它们。在 Kafka 收到的消息中,我可以在每条消息的开头看到一个模式。我知道最好的做法是不嵌入架构并将其与实际的 Avro 文件分开,但我无法控制生产者,也无法更改这一点。
我的代码运行在 Apache Storm 之上。首先我创建一个阅读器:
mDatumReader = new GenericDatumReader<GenericRecord>();
后来我尝试在不声明模式的情况下反序列化消息:
Decoder decoder = DecoderFactory.get().binaryDecoder(messageBytes, null);
GenericRecord payload = mDatumReader.read(null, decoder);
但是当消息到达时我收到错误:
Caused by: java.lang.NullPointerException: writer cannot be null!
at org.apache.avro.io.ResolvingDecoder.resolve(ResolvingDecoder.java:77) ~[stormjar.jar:?]
at org.apache.avro.io.ResolvingDecoder.<init>(ResolvingDecoder.java:46) ~[stormjar.jar:?]
at org.apache.avro.io.DecoderFactory.resolvingDecoder(DecoderFactory.java:307) ~[stormjar.jar:?]
at org.apache.avro.generic.GenericDatumReader.getResolver(GenericDatumReader.java:122) ~[stormjar.jar:?]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:137) ~[stormjar.jar:?]
我看到的所有答案都是关于使用其他格式、更改传递到 Kafka 的消息或其他内容。我无法控制这些事情。
我的问题是,给定
bytes[]
中的消息,并在二进制消息中嵌入架构,如何在不声明架构的情况下反序列化该 Avro 文件,以便我可以读取它。
使用 DatumReader/Writer,不存在嵌入式架构之类的东西。当我第一次看到 Avro 和 Kafka 时,这也是我的误解。但 Avro Serializer 的源代码清楚地表明使用 GenericDatumWriter 时没有嵌入模式。
DataFileWriter 在文件开头写入架构,然后使用 GenericDatumWriter 添加 GenericRecords。
既然您一开始就说有一个模式,我假设您可以读取它,将其转换为 Schema 对象,然后将其传递到 GenericDatumReader(schema) 构造函数中。 了解消息是如何序列化的会很有趣。也许 DataFileWriter 用于写入 byte[] 而不是实际文件,那么您可以使用 DataFileReader 来反序列化数据吗?
添加Maven依赖
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.9.1</version>
<type>maven-plugin</type>
</dependency>
创建如下文件
{"namespace": "tachyonis.space",
"type": "record",
"name": "Avro",
"fields": [
{"name": "Id", "type": "string"},
]
}
将上面的内容另存为 Avro.avsc 到 src/main/resources 中。
在 Eclipse 或任何 IDE 中运行 > Maven 生成源代码,这些源文件创建 Avro.java 到打包文件夹 [namespace] tachyonis.space
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
KafkaConsumer<String, Avro> consumer = new KafkaConsumer<>(props);
消费者/生产者必须在同一台机器上运行。否则,您需要在 Windows/Linux 中配置主机文件,并将所有组件配置属性从 localhost 更改为映射到实际 IP 地址,以便广播给生产者/消费者。否则您会收到诸如网络连接问题之类的错误
Connection to node -3 (/127.0.0.1:9092) could not be established. Broker may not be available
我在使用 Spring Boot 时遇到了同样的问题。
不同的系统发送带有嵌入模式的消息作为有效负载的一部分:
Objavro.schema#{"type":"record","name":"CONTACTS","namespace":"my.avros","fields":[{"name":"EXTERNAL_CONTACT_ID","type":"string"},{"name":"EXTERNAL_SYSTEM","type":"string"},{"name":"CONTACT_ID","type":["null","int"]},{"name":"CREATE_TIMESTAMP","type":"string"},{"name":"UPDATE_TIMESTAMP","type":"string"}],"version":"1"}####F#[####)Ml1068GSIS##&2020-06-26T08:33:28&2021-06-25T10:47:43####F# [####)M
这就是我如何通过将字节数组包装到 FileReader 中来反序列化数据:
首先:我们跨系统共享 AVRO 架构:
{
"type": "record",
"namespace": "my.avro",
"name": "CONTACTS",
"version": "1",
"fields":
[
{
"name": "EXTERNAL_CONTACT_ID",
"type": "string"
},
{
"name": "EXTERNAL_SYSTEM",
"type": "string"
},
{
"name": "CONTACT_ID",
"type": ["null", "int"]
},
{
"name": "CREATE_TIMESTAMP",
"type": "string"
},
{
"name": "UPDATE_TIMESTAMP",
"type": "string"
}
]
}
然后我们通过 avro-maven-plugin 生成源代码
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.3</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/target/generated-sources/avro</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
然后,在我们的 KafkaListener 中,我们将有效负载包装到
DataFileReader
:
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
//...
public void deSerializeAvroBinary(byte[] data) throws IOException {
DatumReader<CONTACTS> datumReader =
new SpecificDatumReader<>(CONTACTS.getClassSchema());
CONTACTS output;
try (DataFileReader<CONTACTS> dataFileReader = new DataFileReader(
new SeekableByteArrayInput(data), datumReader)) {
while (dataFileReader.hasNext()) {
output = dataFileReader.next();
// do whatever you want with the data
log.info(output.toString());
}
}
}
旁注:
在我们的应用程序属性中,我们通过以下方式配置 Kafka 消费者:
kafka:
bootstrap-servers: localhost:9093
security:
protocol: "SSL"
consumer:
group-id: "my_testgroupid"
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl:
key-store-location: file:/my-consumer-cert.p12
key-store-password: *************
trust-store-location: file:/truststore.p12
trust-store-password: *************