Apache Beam和avro:创建没有架构的数据流管道

问题描述 投票:0回答:1

我正在使用Apache beam构建数据流管道。下面是伪代码:

PCollection<GenericRecord> rows = pipeline.apply("Read Json from PubSub", <some reader>)
    .apply("Convert Json to pojo", ParDo.of(new JsonToPojo()))
    .apply("Convert pojo to GenericRecord", ParDo.of(new PojoToGenericRecord()))
    .setCoder(AvroCoder.of(GenericRecord.class, schema));

我试图摆脱在管道中设置编码器,因为在管道创建时不会知道模式(它将出现在消息中)。

我注释掉设置编码器的行,并得到一个Exception说没有配置默认编码器。我使用of方法的一个参数版本并获得以下Exception

Not a Specific class: interface org.apache.avro.generic.GenericRecord
    at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
    at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
    at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
    at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
    at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
    at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
    ... 9 more

有没有办法让我们在运行时提供编码器,而不事先知道架构?

google-cloud-dataflow avro apache-beam
1个回答
1
投票

这个有可能。我推荐以下方法:

  • 不要使用GenericRecord类型的中间集合。将其保存为POJO的集合。
  • 编写一些变换来提取数据的模式,并将其作为PCollectionView<however you want to represent the schema>提供。
  • 写入BigQuery时,请通过PCollection<YourPojo>编写write().to(DynamicDestinations),写入Avro时,请将FileIO.write()writeDynamic()AvroIO.sinkViaGenericRecords()结合使用。这两个都可以从侧输入(您在上面计算)中采用动态计算的模式。
© www.soinside.com 2019 - 2024. All rights reserved.