我是 Spark 的新手,并试图了解像reduce、aggregate 等函数是如何工作的。 在查看 RDD.aggregate() 时,我尝试将 ZeroValue 更改为除恒等式之外的其他值(0 表示加法,1 表示乘法),以了解内部是如何工作的。
这是我尝试过的:
# with identity zeroValue : (0, 0)
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
l = sc.parallelize([1, 2, 3, 4])
print(l.aggregate((0, 0), seqOp, combOp))
>>> (10, 4)
# with a different zeroValue : (3, 5)
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
l = sc.parallelize([1, 2, 3, 4])
print(l.aggregate((3, 5), seqOp, combOp))
>>> (49, 69)
第二个输出是我没想到的。考虑到我刚刚将 (0, 0) 更改为 (3, 5),并且相应位置的数字会相加,我预计它会是 (13, 9)。
我认为这可能与分区有关。结果我们的 RDD 有 12 个分区!因此,根据文档,zeroValue 将成为每个分区的 x 的初始值。当分区合并时,每个空白分区的累加值为(3, 5)。 由于有 12 个分区,考虑其中 11 个为空,结果应该是:
(13 + 3 * 11, 9 + 5 * 11) = (46, 64)
仍然不是我们得到的。但我们可以看到,我们的计算距离成为实际结果仅差 1 (3, 5)。
{(46 + 3, 64 + 5) = (49, 69)}
我将分区数量减少到1(以确保所有数据都在同一个分区中并且没有分区为空)并执行相同的操作。结果仍然有同样的异常。
print(l.coalesce(1).aggregate((3, 5), seqOp, combOp))
>>> (16, 14)
这次我肯定期望 (13, 9),因为只有 1 个分区。但距离实际结果还是差了 1 (3, 5)
{(13 + 3, 9 + 5) = (16 , 14)}
为什么 RDD.aggregate() 的结果比第一个零值多了一个零值。 RDD 有多少个分区?另外,这个额外的 ZeroValue 在什么时候会添加到累计值中?
您正确地推断出分区数量是导致结果变化的原因。
如果您尝试运行以下代码片段,您将得到累加器的值
(13, 9)
。
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
zeroValue = (3, 5)
acc = zeroValue
for num in [1, 2, 3, 4]:
acc = seqOp(acc, num)
acc
看起来很有道理,不是吗?列表中的每个元素都会扮演
y
中seqOp
的角色,因此acc的第一个元素将相当于zeroValue[0] + sum([1, 2, 3, 4])
,即13。同时,在每一步,zeroValue[1]
都会加 1,得到 zeroValues[1] + len([1, 2, 3, 4])
,即 9。但是请注意,我们甚至不需要定义 combOp
。
现在,假设我们只有一个分区。我们的最终结果将是 reducing 这个结果,起始值为... zeroValue
,并带有聚合结果的函数。这个操作自然是combOp
,我们将其定义为(x[0] + y[0], x[1] + y[1])
。
我们在处理多个分区时得到您所描述的行为的原因是,reduce 部分发生在
.collect()
操作之后,其中所有结果都发送到 Spark 的驱动程序。每个空分区只有 zeroValue
,因为没有累积任何内容,而非空分区则发送 seqOp
的结果。然后,司机用combOp
将这个结果进一步减少到(49, 69)
。