我可以将压缩的 jsonl 数据从 GCS 加载到 BigQuery 并使用 DataFlow 添加额外的日期列吗

问题描述 投票:0回答:1

借助这篇文章,我想创建一个梁数据流作业以将数据从 GCS 加载到 Bigquery。 GCS 存储桶中有数千个文件,所有这些文件都非常庞大,并且都是压缩的 JSONL 数据。数据格式使得无法使用日期字段创建分区表,因此我想在管道期间添加自己的分区表。

是否可以将手动字段添加到管道中,与压缩数据分开,以便当我将数据从 GCS 加载到 BigQuery 时,它会出现在最终的 BigQuery 表中?我希望能够执行此操作,而无需解压缩任何文件或对表本身执行顺序

SELECT <CRITERIA>
操作。

google-bigquery pipeline google-cloud-dataflow gcs beam
1个回答
0
投票

是的,您可以轻松地执行以下操作:

class CustomParsing(beam.DoFn):
    def to_runner_api_parameter(self, unused_context):
        return "beam:transforms:custom_parsing:custom_v0", None

    def process(self, element: bytes, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
        parsed = json.loads(element.decode("utf-8"))
        parsed["datetimefield"] = timestamp.to_rfc3339()
        yield parsed

...

 with beam.Pipeline(options=pipeline_options) as p:
        (
            p
            | "ReadFromGCS" >> beam.io.ReadFromText('gs://bucket/*.json')
            | "CustomParse" >> beam.ParDo(CustomParsing())
            | "WriteToBigQuery" >> beam.io.WriteToBigQuery(BIGQUERY_TABLE,
                                                 schema=BIGQUERY_SCHEMA,
                                                 create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                 write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
        )

将添加

datetimefield
列,并且可以将数据插入到BQ表中的该字段

© www.soinside.com 2019 - 2024. All rights reserved.