SideInput I / O会降低性能

问题描述 投票:0回答:1

我正在使用Python SDK 2.15.0构建数据流管道。在此管道中,我需要在管道的多个阶段将附加数据连接到每个元素。

所有这些附加数据都是从Google云端存储(用于Dataflow和GCS存储桶的同一区域)的avro文件中读取的,通过使用map函数组织为键值元组,然后使用pvalue作为侧面输入传递给DoFn。 AsDict()。在管道执行期间,侧面输入数据将不会更改。

第一次连接(侧面输入大小〜1 MB)非常顺利。但是,第二个联接确实遭受不良性能的困扰。它的侧面输入大小约为50 MB。

数据流执行图清楚地显示了导致性能下降的原因:我的ParDo步骤所消耗的时间大约有90%用于读取侧面输入。即使我仅使用四个工作节点,从sideinput读取的数据量也超出了其实际大小几个数量级。

我有什么办法可以防止这种情况发生?我是否需要以某种方式配置工作者缓存大小?用我的DoFn的设置方法准备其他数据,而不是将其作为sideinput传递会更好吗?

这是我准备侧面输入的方式:

sideinput_1 = pvalue.AsDict(p | "Read side input data 1" >> beam.io.ReadFromAvro("gs:/bucket/small_file.avro",0,False,True) \
                              | "Prepare sideinput 1" >> beam.Map(lambda x: (x["KEY"],x["VALUE"])))

# Preparing data for later join
sideinput_2 = pvalue.AsDict(p | "Read side input data 2" >> beam.io.ReadFromAvro("gs://bucket/bigger_file.avro",0,False,True) \
                              | "Prepare side input data 2" >> beam.Map(lambda x: ((x["KEYCOL1"],x["KEYCOL2"],x["KEYCOL3"]),x)))

使用侧面输入:


matching = p | "Read address data" >> beam.io.Read(beam.io.BigQuerySource(query=sql_addr, use_standard_sql=True)) \
                 | "Join w/ sideinput1" >> beam.ParDo(Join1(), sideinput_1 ).with_outputs('unmatched', main='matched')                                                                                

result = matching["matched"] | "Join Sideinput 2" >> beam.ParDo(Join2(), sideinput_2 )

DoFn处理方法只在侧面输入中包含键的查找,并根据是否匹配来向元素添加一些其他数据。

python dataflow beam
1个回答
0
投票

好吧,一个月后再进行讨论,根据获得的经验,让我再试一下:

我很确定侧面输入的性能问题归结为内存交换问题。在管道中,还有其他一些非常相似的联接,但侧面输入要小得多。它们以合理的时间运行。但是,所有这些连接的比率(IO字节/ sideinput字节)大致相等。

当我将实施从具有SideInput的ParDo切换为CoGroupByKey Transform时,受影响的联接的性能提高了几个数量级。

关于侧面输入的大小,以及何时使用SideInput与DoFn相比,何时更喜欢CoGroupByKey的说法:

great blog entry "Guide to common Cloud Dataflow use-case patterns"指出,对于流式传输最多100 MB,批处理模式下1 GB的SideInputs可以使用ParDo:

注意:如果可能,请对任何一个连接表实际上很小的活动使用SideInputs-在流模式下约为100MB,而在批处理模式下则为1GB。这样做会更好。

我想没有适合每种情况的一般阈值。可能在很大程度上取决于您的管道,机器类型和工人人数等。就我而言,由于管道的高度复杂性,我认为阈值较低。它包含约40个转换,包括多个联接。

因此,如果在使用ParDo和Sideinput进行连接时遇到相同的问题,则可能需要尝试CoGroupByKey-Transform。

© www.soinside.com 2019 - 2024. All rights reserved.