借助这篇文章,我想创建一个梁数据流作业以将数据从 GCS 加载到 Bigquery。 GCS 存储桶中有数千个文件,所有这些文件都非常庞大,并且都是压缩的 JSONL 数据。数据格式使得无法使用日期字段创建分区表,因此我想在管道期间添加自己的分区表。
是否可以将手动字段添加到管道中,与压缩数据分开,以便当我将数据从 GCS 加载到 BigQuery 时,它会出现在最终的 BigQuery 表中?我希望能够执行此操作,而无需解压缩任何文件或对表本身执行顺序
SELECT <CRITERIA>
操作。
是的,您可以轻松地执行以下操作:
class CustomParsing(beam.DoFn):
def to_runner_api_parameter(self, unused_context):
return "beam:transforms:custom_parsing:custom_v0", None
def process(self, element: bytes, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
parsed = json.loads(element.decode("utf-8"))
parsed["datetimefield"] = timestamp.to_rfc3339()
yield parsed
...
with beam.Pipeline(options=pipeline_options) as p:
(
p
| "ReadFromGCS" >> beam.io.ReadFromText('gs://bucket/*.json')
| "CustomParse" >> beam.ParDo(CustomParsing())
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(BIGQUERY_TABLE,
schema=BIGQUERY_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
将添加
datetimefield
列,并且可以将数据插入到BQ表中的该字段