Apache Beam 管道功能不并行运行

问题描述 投票:0回答:1

我的管道中有一个

Dofn
函数,该函数在 GCP 数据流中运行,并且应该并行执行每个产品的一些处理。

class Step1(DoFn):
    def process(self, element):
        # Get a list of products
        for idx, item in enumerate(product_list):
            yield product, idx

class Step2(DoFn):
    def process(self, element):
        # Get index and product
        logger.info(f"::: Processing product number {index} STARTED at {datetime.now()}:::::")
        # Do some process ....
        logger.info(f"::: FINISHED product number {index} at {datetime.now()}:::::")

with Pipeline(options=pipeline_options) as pipeline:
    results = (
        pipeline
        | "Read from PubSub" >> io.ReadFromPubSub()
        | "Product list"     >> ParDo(Step1())
        | "Process Product"  >> ParDo(Step2())
        | "Group data" >> GroupBy()
        ...
    )

因此

Step2
假设每个产品并行运行。但实际上我在日志中得到的是:

::: Processing product number 0 STARTED at <some_time> :::::
::: FINISHED product number 0 at <some_time>:::::
::: Processing product number 1 STARTED at <some_time> :::::
::: FINISHED product number 1 at <some_time>:::::
::: Processing product number 2 STARTED at <some_time> :::::
::: FINISHED product number 2 at <some_time>:::::
::: Processing product number 3 STARTED at <some_time> :::::
::: FINISHED product number 3 at <some_time>:::::
...

这表明一切都按顺序运行,而不是并行运行

Step2
,这对于大量产品来说需要很长时间才能完成。

我在这里缺少什么吗?

ParDo
函数不是应该并行运行吗?

更新

正如 apache beam 文档所建议的,我在 PipelineOptions 中尝试了以下选项,并且仔细检查了它们是否确实在 GCP 的作业中设置,但结果是相同的:

  • direct_num_workers=0
  • direct_running_mode='multi_threading'
  • direct_running_mode='multi_processing'
python google-cloud-platform google-cloud-dataflow apache-beam
1个回答
2
投票

检查数据流作业的执行图后,我意识到这两个步骤都在一个熔丝中。这意味着“F”阶段的输入元素决定了它的并行性。因此,如果我只向它提供一个元素,它将由单个线程处理。

为了防止这种情况发生,我必须应用“融合预防”步骤,例如在两个步骤之间进行Reshuffle

with Pipeline(options=pipeline_options) as pipeline:
    results = (
        pipeline
        | "Read from PubSub" >> io.ReadFromPubSub()
        | "Product list"     >> ParDo(Step1())
        | "Reshuffle" >> Reshuffle()
        | "Process Product"  >> ParDo(Step2())
        | "Group data" >> GroupBy()
        ...
    )

我在日志中得到了这些:

::: Processing product number 17 STARTED at 2023-11-03 14:27:59.371644:::::
::: Processing product number 10 STARTED at 2023-11-03 14:27:59.353624:::::
::: Processing product number 4 STARTED at 2023-11-03 14:27:59.364676:::::
::: Processing product number 12 STARTED at 2023-11-03 14:27:59.327599:::::
::: Processing product number 14 STARTED at 2023-11-03 14:27:59.326156:::::
© www.soinside.com 2019 - 2024. All rights reserved.