我正在读取 GCS 存储桶并生成一个 json 文件,稍后需要将其导入到 BQ 中。 但我收到错误“OSError:根据文件模式找不到文件”
with beam.Pipeline(options=pipeline_options) as p:
(p
| "ReadInputData" >> beam.io.ReadFromText(known_args.input_bucket)
| "TransformToDict" >> beam.ParDo(TransformToDict)
| "Filter" >> beam.Filter(lambda record: "cid" in record)
| "WriteToGCS" >> beam.io.WriteToText(known_args.input_bucket + "merged/", file_name_suffix=".json", shard_name_template='merged-SSSSS-of-NNNNN')
| "ReadJSONData" >> beam.io.ReadFromText(known_args.input_bucket + "merged/")
| "LoadFileToToBQ" >> beam.io.WriteToBigQuery(
table = table_id,
# dataset = dataset_id,
# project = project_id,
custom_gcs_temp_location = known_args.input_bucket + "bqtemp/",
method = beam.io.WriteToBigQuery.Method.FILE_LOADS,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
)
)
您的管道中有多个源
ReadInputData
和ReadJSONData
,无法链接。 Beam 使用每个源作为输入构建 DAG 树,这些源或多或少是同时读取的。例如,请参阅 this stackoverflow 帖子(以及链接的图像)。
由于 ReadJSONData
无法工作,除非 WriteToGCS
已完成,否则您的管道将失败。
作为问题的解决方案,我建议在将其内容保留为
PCollection
时重复使用 GCS
。像这样的东西
with beam.Pipeline(options=pipeline_options) as p:
merged = (
p
| "ReadInputData" >> beam.io.ReadFromText(known_args.input_bucket)
| "TransformToDict" >> beam.ParDo(TransformToDict)
| "Filter" >> beam.Filter(lambda record: "cid" in record)
)
(
merged
| "WriteToGCS" >> beam.io.WriteToText(known_args.input_bucket + "merged/", file_name_suffix=".json", shard_name_template='merged-SSSSS-of-NNNNN')
)
(
merged
| "LoadFileToToBQ" >> beam.io.WriteToBigQuery(...) # you need to adjust this to not use FILE_LOADS
)
但是我还没有和
WriteToText
合作过。因此,一旦写入文件, merged
可能会为空,并且 LoadFileToToBQ
将失败(类似于使用 beam.Map(print)
)。在这种情况下,您可以使用消息队列(例如,如果您使用 GCP,则为 PubSub
)在步骤之间进行“通信”。但是,请记住,这将导致管道处于流模式,即它将无限期地运行。