Avro 规范声明使用可选字段扩展架构是向后兼容的。不幸的是,它不适用于我们的二进制流,我不知道如何修复它。我编写了一个简单的演示应用程序来演示该问题。生产者创建一个实例,序列化它,并将其保存到文件中,消费者读取该文件并反序列化流。如果将可选字段添加到架构中并编译架构(maven 插件),则无法反序列化基于先前版本架构的序列化实例。当我们使用 DataFileWriter/Reader 时,行为会有所不同,在这种情况下它可以工作,但我们需要二进制流,因为我们使用 kafka 并且消息包含序列化数据。
我们怎样才能实现向后兼容?
架构(版本1):(test.avsc)
{
"name": "Test1",
"type": "record",
"namespace": "model",
"fields": [
{ "name": "userid", "type": "string" }
]
}
架构(版本2):(test.avsc)
{
"name": "Test1",
"type": "record",
"namespace": "model",
"fields": [
{ "name": "userid", "type": "string" },
{ "name": "department", "type": ["null", "string"], "default": null }
]
}
pom.xml
[...]
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
[...]
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
[...]
ProducerToOutfile.java
package producer;
import jakarta.xml.bind.DatatypeConverter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import model.Test1;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class ProducerToOutfile {
private static final Logger LOGGER = LogManager.getLogger();
private static final String FILE_NAME_SERIALISED_NOTICES = "minimalVersion.avro";
public static void main(String[] args) {
ProducerToOutfile producer = new ProducerToOutfile();
Test1 test1 = new Test1();
test1.setUserid("myuser");
try {
producer.serialiseToByteStream(test1);
LOGGER.info("### Test-Object has been serialised.");
} catch (IOException ex) {
LOGGER.error("### Failed to serialise");
throw new RuntimeException(ex);
}
}
private void serialiseToByteStream(Test1 test1) throws IOException {
byte[] bytes = null;
try {
if (test1 != null) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
DatumWriter<Test1> notizDatumWriter = new SpecificDatumWriter<Test1>(Test1.class);
notizDatumWriter.write(test1, binaryEncoder);
binaryEncoder.flush();
byteArrayOutputStream.close();
bytes = byteArrayOutputStream.toByteArray();
LOGGER.info("serialized Test1 object ='{}'", DatatypeConverter.printHexBinary(bytes));
Files.write(Paths.get(FILE_NAME_SERIALISED_NOTICES), bytes);
}
} catch (Exception e) {
LOGGER.error("Unable to serialize Notizen ", e);
}
}
}
ConsumerFromFile.java
package consumer;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import model.Test1;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class ConsumerFromFile {
private static final Logger LOGGER = LogManager.getLogger();
private static final String FILE_NAME_SERIALISED_NOTICES = "minimalVersion.avro";
//private static final String FILE_NAME_SERIALISED_NOTICES = "NotizByteStream.avro";
public static void main(String[] args) throws IOException {
ConsumerFromFile consumer = new ConsumerFromFile();
consumer.deserializeByteStream(new File(FILE_NAME_SERIALISED_NOTICES));
}
public void deserializeByteStream(File file) {
Test1 test1 = null;
LOGGER.info("### Deserialising ByteStream Test1-object");
try {
byte[] bytes = Files.readAllBytes(Paths.get(file.getPath()));
DatumReader<Test1> datumReader = new SpecificDatumReader<>(Test1.class);
Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
test1 = datumReader.read(null, decoder);
LOGGER.info("Deserialized byte stream data='{}'", test1.toString());
} catch (Exception e) {
LOGGER.error("### Unable to Deserialize bytes[] ", e);
}
}
}
尝试读取已编译的 V2 模式的 V1 模式后出现异常:
2024-05-06 12:01:41.400 [main] ERROR consumer.ConsumerFromFile - ### Unable to Deserialize bytes[]
java.io.EOFException: null
at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:514) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:155) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:465) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) ~[avro-1.11.3.jar:1.11.3]
at consumer.ConsumerFromFile.deserializeByteStream(ConsumerFromFile.java:34) [classes/:?]
at consumer.ConsumerFromFile.main(ConsumerFromFile.java:22) [classes/:?]
解决方案:
利用 Avros 模式演化的关键是同时拥有作者模式(用于序列化的模式)和读者模式(用于反序列化的模式)。
然后你可以告诉读者要使用哪种模式。使用 GenericDatumReader 可以访问字节流中的变量,并且可以手动构造模型类。如果模型类很复杂,这可能是一项很烦人的工作。
Avro 提供了 ResolvingDecoder。您可以使用它自动构造模型类。尽管如此,您必须将任何新字段添加到模式定义的末尾,并且不得更改两个模式版本之间的字段顺序才能使其运行,因为 avro 将按照出现的顺序填充字段。我认为这是一个可以接受的妥协。如果您从 V1 到 V2 删除字段也没有问题,只会使用现有字段。
反序列化使用的方法:
public void deserializeBytes(File file) {
Test1 test1;
String writerSchemaString = "schema as string ...";
Schema writerSchema = new Schema.Parser().parse(writerSchemaString);
Schema readerSchema = Test1.getClassSchema();
try {
byte[] bytes = Files.readAllBytes(Paths.get(file.getPath()));
Decoder decoder = new DecoderFactory().binaryDecoder(new ByteArrayInputStream(bytes), null);
ResolvingDecoder resolvingDecoder = new DecoderFactory().resolvingDecoder(writerSchema, readerSchema, decoder);
// to check whether new fields have been added to the end of the schema
Schema.Field[] fieldOrder = resolvingDecoder.readFieldOrderIfDiff();
if(fieldOrder == null){
DatumReader<Test1> datumReader = new SpecificDatumReader<>(Test1.class);
test1 = datumReader.read(null, resolvingDecoder);
}else{
LOGGER.info("Object has to be created by means of GeneriCDatumReader to deal with changed field order");
}
} catch (Exception e) {
LOGGER.error("### Unable to Deserialize bytes[] ", e);
}
}
我将解决方案添加到问题末尾。