GCP 数据流作业中的并发处理

问题描述 投票:0回答:1

我正在尝试使用

ThreadPoolExecutor
同时运行三个不同的类。但数据流作业似乎没有同时运行,并且它正在一个接一个地运行每个类。而且,如果我在
max_worker
内部使用
ThreadPoolExecutor
,一旦数据流开始运行,它仍然会缩减为一名工作人员。我无法弄清楚如何修改代码以便它同时运行所有类。我在使用控制台运行代码时也使用了
autoscaling_algorithm=NONE
。但它也显示出同样未被认可的论据。

import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import concurrent.futures

class StudentId(beam.DoFn):
    def process(self, element):
        # code block for generating unique ids
        pass

class ReadExcel(beam.DoFn):
    def process(self, element, file_path):
        # code block for reading Excel file
        pass

class DqcP(beam.DoFn):
    def process(self, element, bq_project, sid):
        # code block for dqc_p function
        pass

class DqcR(beam.DoFn):
    def process(self, element, bq_project, sid):
        # code block for dqc_r function
        pass

def run_pipeline(pipeline):
    return pipeline.run()

def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument('--landing', required=True, type=str)
    parser.add_argument('--BQ_project', required=True, type=str)

    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args)

    with beam.Pipeline(options=pipeline_options) as pipeline:
        # Generate unique ids
        sid = (pipeline | 'sid' >> beam.Create([None]) | 'sid_gen' >> beam.ParDo(StudentId()))

        # Read reference Excel file
        ref_excel = (pipeline | 'start reading' >> beam.Create([None])
                     | 'read ref excel' >> beam.ParDo(ReadExcel(), known_args.landing))

    # Run DQC_P and DQC_R concurrently
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Submit DQC_P pipeline
        dqc_p_future = executor.submit(run_pipeline, beam.Pipeline(options=pipeline_options))
        dqc_p_pipeline = dqc_p_future.result()
        dqc_p_result = (ref_excel | 'dqc_p' >> beam.ParDo(DqcP(), known_args.BQ_project, beam.pvalue.AsSingleton(sid)))

        # Submit DQC_R pipeline
        dqc_r_future = executor.submit(run_pipeline, beam.Pipeline(options=pipeline_options))
        dqc_r_pipeline = dqc_r_future.result()
        dqc_r_result = (ref_excel | 'dqc_r' >> beam.ParDo(DqcR(), known_args.BQ_project, beam.pvalue.AsSingleton(sid)))

    # Wait for DQC_P and DQC_R pipelines to finish
    dqc_p_result.pipeline.run()
    dqc_r_result.pipeline.run()

if __name__ == '__main__':
    run()

google-cloud-platform google-cloud-dataflow apache-beam
1个回答
0
投票

您的代码将无法工作。我不知道你为什么需要

concurrent.futures.ThreadPoolExecutor()
。您的代码应该定义整个管道,然后将其提交到 Dataflow,该数据流将由许多工作人员执行。

您的代码应该更改为如下所示:

with beam.Pipeline(options=pipeline_options) as pipeline:
    # Generate unique ids
    sid = (pipeline | 'sid' >> beam.Create([None]) | 'sid_gen' >> beam.ParDo(StudentId()))

    # Read reference Excel file
    ref_excel = (pipeline | 'start reading' >> beam.Create([None])
                     | 'read ref excel' >> beam.ParDo(ReadExcel(), known_args.landing))

    ref_excel | 'dqc_p' >> beam.ParDo(DqcP(), known_args.BQ_project, beam.pvalue.AsSingleton(sid)))

    ref_excel | 'dqc_r' >> beam.ParDo(DqcR(), known_args.BQ_project, beam.pvalue.AsSingleton(sid)))

© www.soinside.com 2019 - 2024. All rights reserved.