Avro、架构演化、向后兼容性

问题描述 投票:0回答:1

Avro 规范声明使用可选字段扩展架构是向后兼容的。不幸的是,它不适用于我们的二进制流,我不知道如何修复它。我编写了一个简单的演示应用程序来演示该问题。生产者创建一个实例,序列化它,并将其保存到文件中,消费者读取该文件并反序列化流。如果将可选字段添加到架构中并编译架构(maven 插件),则无法反序列化基于先前版本架构的序列化实例。当我们使用 DataFileWriter/Reader 时,行为会有所不同,在这种情况下它可以工作,但我们需要二进制流,因为我们使用 kafka 并且消息包含序列化数据。

我们怎样才能实现向后兼容

架构(版本1):(test.avsc)

{
  "name": "Test1",
  "type": "record",
  "namespace": "model",
  "fields": [
    { "name": "userid", "type": "string" }
  ]
}

架构(版本2):(test.avsc)

{
  "name": "Test1",
  "type": "record",
  "namespace": "model",
  "fields": [
    { "name": "userid", "type": "string" },
    { "name": "department", "type": ["null", "string"], "default": null }
  ]
}

pom.xml

[...]
  <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${avro.version}</version>
  </dependency>
[...]

           <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>${avro.version}</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
[...]

ProducerToOutfile.java

package producer;

import jakarta.xml.bind.DatatypeConverter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import model.Test1;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class ProducerToOutfile {

    private static final Logger LOGGER = LogManager.getLogger();
    private static final String FILE_NAME_SERIALISED_NOTICES = "minimalVersion.avro";

    public static void main(String[] args) {
        ProducerToOutfile producer = new ProducerToOutfile();
        Test1 test1 = new Test1();
        test1.setUserid("myuser");
        try {
            producer.serialiseToByteStream(test1);
            LOGGER.info("### Test-Object has been serialised.");
        } catch (IOException ex) {
            LOGGER.error("### Failed to serialise");
            throw new RuntimeException(ex);
        }
    }

    private void serialiseToByteStream(Test1 test1) throws IOException {
        byte[] bytes = null;
        try {
            if (test1 != null) {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
                DatumWriter<Test1> notizDatumWriter = new SpecificDatumWriter<Test1>(Test1.class);

                notizDatumWriter.write(test1, binaryEncoder);
                binaryEncoder.flush();
                byteArrayOutputStream.close();
                bytes = byteArrayOutputStream.toByteArray();
                LOGGER.info("serialized Test1 object ='{}'", DatatypeConverter.printHexBinary(bytes));

                Files.write(Paths.get(FILE_NAME_SERIALISED_NOTICES), bytes);
            }
        } catch (Exception e) {
            LOGGER.error("Unable to serialize Notizen ", e);
        }
    }
}

ConsumerFromFile.java

package consumer;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import model.Test1;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ConsumerFromFile {

    private static final Logger LOGGER = LogManager.getLogger();
    private static final String FILE_NAME_SERIALISED_NOTICES = "minimalVersion.avro";
    //private static final String FILE_NAME_SERIALISED_NOTICES = "NotizByteStream.avro";
    public static void main(String[] args) throws IOException {
        ConsumerFromFile consumer = new ConsumerFromFile();
        consumer.deserializeByteStream(new File(FILE_NAME_SERIALISED_NOTICES));
    }

    public void deserializeByteStream(File file) {
        Test1   test1 = null;
        LOGGER.info("### Deserialising ByteStream Test1-object");

        try {
            byte[] bytes = Files.readAllBytes(Paths.get(file.getPath()));

            DatumReader<Test1> datumReader = new SpecificDatumReader<>(Test1.class);
            Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
            test1 = datumReader.read(null, decoder);
            LOGGER.info("Deserialized byte stream data='{}'", test1.toString());

        } catch (Exception e) {
            LOGGER.error("### Unable to Deserialize bytes[] ", e);
        }
    }
}

尝试读取已编译的 V2 模式的 V1 模式后出现异常:

2024-05-06 12:01:41.400 [main] ERROR consumer.ConsumerFromFile - ### Unable to Deserialize bytes[] 
java.io.EOFException: null
    at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:514) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:155) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:465) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) ~[avro-1.11.3.jar:1.11.3]
    at consumer.ConsumerFromFile.deserializeByteStream(ConsumerFromFile.java:34) [classes/:?]
    at consumer.ConsumerFromFile.main(ConsumerFromFile.java:22) [classes/:?]

解决方案:

利用 Avros 模式演化的关键是同时拥有作者模式(用于序列化的模式)和读者模式(用于反序列化的模式)。

然后你可以告诉读者要使用哪种模式。使用 GenericDatumReader 可以访问字节流中的变量,并且可以手动构造模型类。如果模型类很复杂,这可能是一项很烦人的工作。

Avro 提供了 ResolvingDecoder。您可以使用它自动构造模型类。尽管如此,您必须将任何新字段添加到模式定义的末尾,并且不得更改两个模式版本之间的字段顺序才能使其运行,因为 avro 将按照出现的顺序填充字段。我认为这是一个可以接受的妥协。如果您从 V1 到 V2 删除字段也没有问题,只会使用现有字段。

反序列化使用的方法:

public void deserializeBytes(File file) {

        Test1 test1;

        String writerSchemaString = "schema as string ...";

        Schema writerSchema = new Schema.Parser().parse(writerSchemaString);

        Schema readerSchema = Test1.getClassSchema();

 

        try {

            byte[] bytes = Files.readAllBytes(Paths.get(file.getPath()));

 

            Decoder decoder = new DecoderFactory().binaryDecoder(new ByteArrayInputStream(bytes), null);

            ResolvingDecoder resolvingDecoder = new DecoderFactory().resolvingDecoder(writerSchema, readerSchema, decoder);

 

            // to check whether new fields have been added to the end of the schema

            Schema.Field[] fieldOrder = resolvingDecoder.readFieldOrderIfDiff();

            if(fieldOrder == null){

                DatumReader<Test1> datumReader = new SpecificDatumReader<>(Test1.class);

                test1 = datumReader.read(null, resolvingDecoder);

            }else{

                LOGGER.info("Object has to be created by means of GeneriCDatumReader to deal with changed field order");

            }

        } catch (Exception e) {

            LOGGER.error("### Unable to Deserialize bytes[] ", e);

        }

    }

java avro
1个回答
0
投票

我将解决方案添加到问题末尾。

© www.soinside.com 2019 - 2024. All rights reserved.