写入Avro文件时进行架构更新

问题描述 投票:0回答:1

上下文:我们有一个数据流作业,可将PubSub消息转换为Avro GenericRecords,并将其以“ .avro”的形式写入GCS。 PubSub消息和GenericRecords之间的转换需要一个模式。该架构每周更改一次,仅添加字段。我们希望能够在不更新数据流作业的情况下更新字段。

我们做了什么:我们采纳了this post的建议,并创建了一个Guava缓存,该缓存每分钟刷新一次。刷新功能将从GCS中提取架构。然后,我们有FileIO.write查询Guava缓存以获取最新的架构,并使用该架构将元素转换为GenericRecord。我们还将FileIO.write输出输出到Avro接收器,该接收器也使用该架构创建。

代码如下:

genericRecordsAsByteArrays.apply(FileIO.<byte[]>write()
    .via(fn((input, c) -> {
          Map<String, Object> schemaInfo = cache.get("");
          Descriptors.Descriptor paymentRecordFd =
              (Descriptors.Descriptor) schemaInfo.get(DESCRIPTOR_KEY);
          DynamicMessage paymentRecordMsg = DynamicMessage.parseFrom(paymentRecordFd, input);
          Schema schema = (Schema) schemaInfo.get(SCHEMA_KEY);

          //From concrete PaymentRecord bytes to DynamicMessage
          try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
            BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
            ProtobufDatumWriter<DynamicMessage> pbWriter = new ProtobufDatumWriter<>(schema);
            pbWriter.write(paymentRecordMsg, encoder);
            encoder.flush();

            // From dynamic message to GenericRecord
            byte[] avroContents = output.toByteArray();
            DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(avroContents, null);
            return reader.read(null, decoder);
          }
        }, requiresSideInputs()),
        fn((output, c) -> {
          Map<String, Object> schemaInfo = cache.get("");
          Schema schema = (Schema) schemaInfo.get(SCHEMA_KEY);
          return AvroIO.sink(schema).withCodec(CodecFactory.snappyCodec());
        }, requiresSideInputs()))
    .withNumShards(5)
    .withNaming(new PerWindowFilenames(baseDir, ".avro"))
    .to(baseDir.toString()));

我的问题:

  1. 当我们写入一个Avro文件时,会发生什么,但是突然之间发生架构更新,现在我们将新架构写入到使用旧架构创建的Avro文件中?
  2. Dataflow看到新架构时会启动新文件吗?
  3. 数据流在创建新文件之前是否会忽略新架构和其他字段?

每个Avro文件在文件的开头都有其自己的架构,因此我不确定预期的行为是什么。

java google-cloud-dataflow avro apache-beam dataflow
1个回答
0
投票

现在我们正在将新架构写入使用旧架构创建的Avro文件中

不可能。每个Avro文件只有一个架构。如果更改,根据定义,您将写入一个新文件。

我怀疑数据流会忽略字段。

© www.soinside.com 2019 - 2024. All rights reserved.