我正在使用Apache beam构建数据流管道。下面是伪代码:
PCollection<GenericRecord> rows = pipeline.apply("Read Json from PubSub", <some reader>)
.apply("Convert Json to pojo", ParDo.of(new JsonToPojo()))
.apply("Convert pojo to GenericRecord", ParDo.of(new PojoToGenericRecord()))
.setCoder(AvroCoder.of(GenericRecord.class, schema));
我试图摆脱在管道中设置编码器,因为在管道创建时不会知道模式(它将出现在消息中)。
我注释掉设置编码器的行,并得到一个Exception
说没有配置默认编码器。我使用of
方法的一个参数版本并获得以下Exception
:
Not a Specific class: interface org.apache.avro.generic.GenericRecord
at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
... 9 more
有没有办法让我们在运行时提供编码器,而不事先知道架构?
这个有可能。我推荐以下方法:
PCollectionView<however you want to represent the schema>
提供。PCollection<YourPojo>
编写write().to(DynamicDestinations)
,写入Avro时,请将FileIO.write()
或writeDynamic()
与AvroIO.sinkViaGenericRecords()
结合使用。这两个都可以从侧输入(您在上面计算)中采用动态计算的模式。