是否有一种方法可以在Beam的ParDo转换中创建用于写入Parquet文件的SpecificRecord列表?

问题描述 投票:0回答:1

我正在尝试用Beam / Java编写Dataflow作业,以处理来自Pub / Sub并写入Parquet的一系列事件。 Pub / Sub中的事件采用JSON格式,每个事件都可以生成一个或多个行。我能够编写一个非常简单的示例,编写一个只返回1条记录的ParDo转换。 ParDo看起来像这样

    static class GenerateRecords extends DoFn<String, GenericRecord> {
        @ProcessElement
        public void processElement(ProcessContext context) {
            final GenericData.Record record = new GenericData.Record(schema);
            String msg = context.element();

            com.tsp.de.schema.mschema pRecord = GenerateParquetRecord(msg);


            context.output(pRecord);
        }
    }

和管道的写入部分

                .apply("Write to file",
                FileIO.<GenericRecord>
                        write()
                        .via(
                                ParquetIO.sink(schema)
                                        .withCompressionCodec(CompressionCodecName.SNAPPY)
                        )
                        .to(options.getOutputDirectory())
                        .withNumShards(options.getNumShards())
                        .withSuffix("pfile")
                );

我的问题是,我如何概括这个ParDo转换以返回记录列表?我尝试了List,但是不起作用,ParquetIO.sink(schema)吠叫“无法通过以下方法解析方法”。

java apache-beam dataflow
1个回答
0
投票

您可以根据需要多次调用context.output()中的DoFn。因此,如果您知道在什么情况下需要发出多个记录的业务逻辑,则只需为每个输出记录调用context.output(record)。它比拥有PCollection的容器更为简单。

PS:顺便说一句,我有一个简单的示例,说明如何用GenericRecordParquetIO编写AvroCoder可能会有所帮助。

© www.soinside.com 2019 - 2024. All rights reserved.