需要将Collection转换为Tablerow的通用格式

问题描述 投票:0回答:1

我正在通过从存储桶读取CSV文件并将其存储在Big Query中进行转换

PCollection引号= .... //读取数据并进行转换

//写入BQ现有表,该表具有2列“ source”和“ quote”。

quotes.apply(
                MapElements.into(TypeDescriptor.of(TableRow.class))
                    .via(
                        (Quote elem) ->
                            new TableRow().set("source", elem.source).set("quote", elem.quote)))
            .apply(
                BigQueryIO.writeTableRows()
                    .to(tableSpecname)
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

我需要替换将PCollection转换为TableRow的代码,因为在某些情况下表列可能会有所不同,因此此核心列名称将无法使用。

google-cloud-dataflow transformation
1个回答
0
投票

您可以仅在输入ParDo和BigQuery写入步骤之间添加PCollection步骤,并添加DoFn类,以所需的方式将数据格式化为TableRow对象。

© www.soinside.com 2019 - 2024. All rights reserved.