如何在 apache beam 中顺序运行单个管道来写入和读取数据

问题描述 投票:0回答:1

我使用 apache beam 和 python 在数据流(gcp)上使用它来加载和转换数据巴克斯。

我有一个管道,它分为不同的部分。 第一部分,写入bigquery。 第二部分读取第一步写入的数据。

我使用相同的

p
管道来运行它,因此两个部分将同时运行。我必须在一个管道中实现顺序运行。我尝试添加一些标志来触发第二部分,但由于 WriteToBigQuery 不返回任何内容,并且我无法使其可迭代,所以我无法实现它。

p = beam.Pipeline(options=opts) part_1 = ( p | "F1: Read data 1" >> beam.io.ReadFromText( entrada, skip_header_lines=True ) | "F1: Transform 1" >> beam.Map(format_date) | "F1: Transform 2" >> Map(make_row) | "F1: Write into BQ" >> WriteToBigQuery( output_table, schema=table_schema, write_disposition=BigQueryDisposition.WRITE_APPEND, create_disposition=BigQueryDisposition.CREATE_IF_NEEDED, additional_bq_parameters={ "timePartitioning": {"type": "DAY"}, "clustering": {"fields": ["programcode"]}, }, custom_gcs_temp_location=temp_location ) )
执行此操作后,我读取写入的数据(

我使用的查询还包括除此之外的其他表),这样:

part_2 = ( p | "F2: Read from BigQuery from first step" >> beam.io.ReadFromBigQuery( query=query_raw, use_standard_sql=True, gcs_location=temp_location, project=project_id ) )
最后运行定义的管道:

result = p.run() result.wait_until_finish()
如果我将part_1添加为part_2中的输入,如下所示:

part_2 = ( part_1 | "F2: Read from BigQuery"...
输出如下,因为part_1输出:

AttributeError: Error trying to access nonexistent attribute `0` in write result. Please see __documentation__ for available attributes.
任何实现这一目标的想法或例子都欢迎讨论。

python google-cloud-dataflow apache-beam
1个回答
0
投票
你可以轻松地做这样的事情:

def run_pipeline(project, region): options = PipelineOptions( runner = "DataflowRunner" , project = project , region = region ) with beam.Pipeline(options=options) as p1: ( p1 | "Create Pipeline" >> beam.Create(["This","is","my","main","Apache Beam","pipeline"]) | "Printing Elements" >> beam.Map(print) ) with beam.Pipeline(options=options) as p2: ( p2 | "SecondPipeline" >> beam.Create([]) | "Print" >> beam.Map(customFunction) )
一旦 p1 完成,p2 将运行

© www.soinside.com 2019 - 2024. All rights reserved.