我正在通过从存储桶读取CSV文件并将其存储在Big Query中进行转换
PCollection引号= .... //读取数据并进行转换
//写入BQ现有表,该表具有2列“ source”和“ quote”。
quotes.apply(
MapElements.into(TypeDescriptor.of(TableRow.class))
.via(
(Quote elem) ->
new TableRow().set("source", elem.source).set("quote", elem.quote)))
.apply(
BigQueryIO.writeTableRows()
.to(tableSpecname)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
我需要替换将PCollection转换为TableRow的代码,因为在某些情况下表列可能会有所不同,因此此核心列名称将无法使用。
您可以仅在输入ParDo
和BigQuery写入步骤之间添加PCollection
步骤,并添加DoFn
类,以所需的方式将数据格式化为TableRow
对象。