使用波束读取记录时重列?

问题描述 投票:0回答:1
  • 我想利用现有的谷歌代码示例(PubSubToBigQuery.java)解析StackDriver日志信息,并将其推入BigQuery
  • 问题是SD日志字段的名字之一是"@type"是一个不能用的BigQuery接受。所以,我创建了一个表中BigQuery不同的字段名(mytest的)。
  • 现在,当我运行PubSubToBigQuery.java很明显,我得到的错误消息 找不到“@type”字段。
  • 如何从"@type"重命名列名我光束代码中"mytype"
google-cloud-dataflow apache-beam
1个回答
0
投票

如果你只想把为Stackdriver日志不变至BigQuery,您可以使用内置的Stackdriver的导出功能,并创建一个接收到的BigQuery:https://cloud.google.com/logging/docs/export/

如果莫名其妙的出口是不是你是可行的,可以修改梁的转换逻辑。

在这种情况下PubSubToBigQuery.java的BigQueryIO使用的TableRow PCollection作为输入写消息至BigQuery。该PubsubMessageToTableRow PTransform确实从PubsubMessage转化成一些错误处理的TableRow。您可以添加帕尔多用自定义DOFN这改变了创建的TableRow的列名。在处理单元的方法可以是这个样子:

 @ProcessElement
 public void processElement(@Element TableRow row, OutputReceiver<TableRow> outputReceiver) {

    TableRow clone = row.clone();
    Object value = clone.get("@type");
    clone.remove("@type");
    clone.set("mytype", value);
    outputReceiver.output(clone);
}

如果使用不变PubSubToBigQuery.java我联系你可以在jsonToTableRowOut.get(TRANSFORM_OUT) PCollection在代码某处将此帕尔多围绕线323。

© www.soinside.com 2019 - 2024. All rights reserved.