Apache Beam Python 管道 - 等待生成第二个 PCollection

问题描述 投票:0回答:1

我正在读取 GCS 存储桶并生成一个 json 文件,稍后需要将其导入到 BQ 中。 但我收到错误“OSError:根据文件模式找不到文件”

    with beam.Pipeline(options=pipeline_options) as p:
    (p 
     | "ReadInputData" >> beam.io.ReadFromText(known_args.input_bucket)
     | "TransformToDict" >> beam.ParDo(TransformToDict)
     | "Filter" >> beam.Filter(lambda record: "cid" in record)
     | "WriteToGCS" >> beam.io.WriteToText(known_args.input_bucket + "merged/", file_name_suffix=".json", shard_name_template='merged-SSSSS-of-NNNNN')
     | "ReadJSONData" >> beam.io.ReadFromText(known_args.input_bucket + "merged/")
     | "LoadFileToToBQ" >> beam.io.WriteToBigQuery(
         table = table_id,
#          dataset = dataset_id,
#          project = project_id,
         custom_gcs_temp_location = known_args.input_bucket + "bqtemp/",
         method = beam.io.WriteToBigQuery.Method.FILE_LOADS,
         create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
         write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
        )
    )
python apache pipeline apache-beam
1个回答
0
投票

您的管道中有多个源

ReadInputData
ReadJSONData
,无法链接。 Beam 使用每个源作为输入构建 DAG 树,这些源或多或少是同时读取的。例如,请参阅 this stackoverflow 帖子(以及链接的图像)。 由于
ReadJSONData
无法工作,除非
WriteToGCS
已完成,否则您的管道将失败。

作为问题的解决方案,我建议在将其内容保留为

PCollection
时重复使用
GCS
。像这样的东西

with beam.Pipeline(options=pipeline_options) as p:
    merged = (
     p 
     | "ReadInputData" >> beam.io.ReadFromText(known_args.input_bucket)
     | "TransformToDict" >> beam.ParDo(TransformToDict)
     | "Filter" >> beam.Filter(lambda record: "cid" in record)
    )
    (
     merged
     | "WriteToGCS" >> beam.io.WriteToText(known_args.input_bucket + "merged/", file_name_suffix=".json", shard_name_template='merged-SSSSS-of-NNNNN')
    )
    (
     merged
     | "LoadFileToToBQ" >> beam.io.WriteToBigQuery(...) # you need to adjust this to not use FILE_LOADS
    )

但是我还没有和

WriteToText
合作过。因此,一旦写入文件,
merged
可能会为空,并且
LoadFileToToBQ
将失败(类似于使用
beam.Map(print)
)。在这种情况下,您可以使用消息队列(例如,如果您使用 GCP,则为
PubSub
)在步骤之间进行“通信”。但是,请记住,这将导致管道处于流模式,即它将无限期地运行。

© www.soinside.com 2019 - 2024. All rights reserved.