在Tour of Beam关于组合函数的文档中,它说了以下内容:
应用组合变换时,必须提供包含组合元素或值的逻辑的函数。组合函数应该是可交换和关联的,因为该函数不一定对具有给定键的所有值调用一次。由于输入数据(包括值集合)可能分布在多个工作人员中,因此可能会多次调用组合函数以对值集合的子集执行部分组合
对于组合函数,上面强调的短语似乎是一个“巨大”的限制,会使创建函数的方式变得非常复杂。但在教程的下一部分,有一个例子打破了这个限制,据我了解: 教程中的平均器示例:
class AverageFn(beam.CombineFn):
def create_accumulator(self):
return (0.0, 0)
def add_input(self, sum_count, input):
(sum, count) = sum_count
return sum + input, count + 1
def merge_accumulators(self, accumulators):
sums, counts = zip(*accumulators)
return sum(sums), sum(counts)
def extract_output(self, sum_count):
(sum, count) = sum_count
return sum / count if count else float('NaN')
教程说:
本地累加器跟踪值的运行总和(最终平均除法的分子值)以及到目前为止求和的值的数量(分母值)。
它可以以分布式方式被调用任意多次为了解释我的问题/困惑,让我创建一个带有值的玩具示例。
[5, 15, 1004, -3, -8, 11]
假设
PCollection
分为两个“实例”:
[15, 1004, 11]
[5, -3, -8]
据我理解文档,它说第一个子集的平均值,[15, 1004, 11]
,可能会计算两次,而第二个子集仅计算一次。
228
的表示,那么将创建
171
而不是 [15, 1004, 11]
的平均值。
这显然不是所发生的事情,那我错过了什么?我知道该类提供了完全可交换和关联的函数,
但不是幂等的函数,这就是说它们 100% 正确的含义,即使“不一定只调用一次”。 理解这一点很重要——组合器函数可以被调用多少次? -- 因为:
通过查看提供的示例,我
Not necessarily invoked exactly once on all values
表示它被调用
[5, 15, 1004, -3, -8, 11]
)捆绑(在您的示例中为两个捆绑
[15, 1004, 11]
[5, -3, -8]
)这并不意味着(在这种情况下)传入的数据被处理两次。此外,
it may be called any number of times in a distributed fashion
意味着捆绑包可以采用任何形式,并由运行器在运行时定义,即它可以是的任何排列
[5, 15, 1004, -3, -8, 11]
[5]
[15, 1004, -3, -8, 11]
[5, 15]
,[1004, -3, -8, 11]
[5]
、[15]
[1004]
[-3]
、[-8]
、[11]
这就是为什么它说[...]因为输入数据(包括值集合)可能是 分布在多个工作人员中,组合函数可能是
对子集
进行部分组合 超值收藏