我的管道中有一个
Dofn
函数,该函数在 GCP 数据流中运行,并且应该并行执行每个产品的一些处理。
class Step1(DoFn):
def process(self, element):
# Get a list of products
for idx, item in enumerate(product_list):
yield product, idx
class Step2(DoFn):
def process(self, element):
# Get index and product
logger.info(f"::: Processing product number {index} STARTED at {datetime.now()}:::::")
# Do some process ....
logger.info(f"::: FINISHED product number {index} at {datetime.now()}:::::")
with Pipeline(options=pipeline_options) as pipeline:
results = (
pipeline
| "Read from PubSub" >> io.ReadFromPubSub()
| "Product list" >> ParDo(Step1())
| "Process Product" >> ParDo(Step2())
| "Group data" >> GroupBy()
...
)
因此
Step2
假设每个产品并行运行。但实际上我在日志中得到的是:
::: Processing product number 0 STARTED at <some_time> :::::
::: FINISHED product number 0 at <some_time>:::::
::: Processing product number 1 STARTED at <some_time> :::::
::: FINISHED product number 1 at <some_time>:::::
::: Processing product number 2 STARTED at <some_time> :::::
::: FINISHED product number 2 at <some_time>:::::
::: Processing product number 3 STARTED at <some_time> :::::
::: FINISHED product number 3 at <some_time>:::::
...
这表明一切都按顺序运行,而不是并行运行
Step2
,这对于大量产品来说需要很长时间才能完成。
我在这里缺少什么吗?
ParDo
函数不是应该并行运行吗?
更新
正如 apache beam 文档所建议的,我在 PipelineOptions 中尝试了以下选项,并且仔细检查了它们是否确实在 GCP 的作业中设置,但结果是相同的:
direct_num_workers=0
direct_running_mode='multi_threading'
direct_running_mode='multi_processing'
检查数据流作业的执行图后,我意识到这两个步骤都在一个熔丝中。这意味着“F”阶段的输入元素决定了它的并行性。因此,如果我只向它提供一个元素,它将由单个线程处理。
为了防止这种情况发生,我必须应用“融合预防”步骤,例如在两个步骤之间进行Reshuffle。
with Pipeline(options=pipeline_options) as pipeline:
results = (
pipeline
| "Read from PubSub" >> io.ReadFromPubSub()
| "Product list" >> ParDo(Step1())
| "Reshuffle" >> Reshuffle()
| "Process Product" >> ParDo(Step2())
| "Group data" >> GroupBy()
...
)
我在日志中得到了这些:
::: Processing product number 17 STARTED at 2023-11-03 14:27:59.371644:::::
::: Processing product number 10 STARTED at 2023-11-03 14:27:59.353624:::::
::: Processing product number 4 STARTED at 2023-11-03 14:27:59.364676:::::
::: Processing product number 12 STARTED at 2023-11-03 14:27:59.327599:::::
::: Processing product number 14 STARTED at 2023-11-03 14:27:59.326156:::::