RDD.aggregate() 如何处理分区?

问题描述 投票:0回答:1

我是 Spark 的新手,并试图了解像reduce、aggregate 等函数是如何工作的。 在查看 RDD.aggregate() 时,我尝试将 ZeroValue 更改为除恒等式之外的其他值(0 表示加法,1 表示乘法),以了解内部是如何工作的。

这是我尝试过的:

# with identity zeroValue : (0, 0)
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
l = sc.parallelize([1, 2, 3, 4])
print(l.aggregate((0, 0), seqOp, combOp))
>>> (10, 4)

# with a different zeroValue : (3, 5)
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
l = sc.parallelize([1, 2, 3, 4])
print(l.aggregate((3, 5), seqOp, combOp))
>>> (49, 69)

第二个输出是我没想到的。考虑到我刚刚将 (0, 0) 更改为 (3, 5),并且相应位置的数字会相加,我预计它会是 (13, 9)。

我认为这可能与分区有关。结果我们的 RDD 有 12 个分区!因此,根据文档,zeroValue 将成为每个分区的 x 的初始值。当分区合并时,每个空白分区的累加值为(3, 5)。 由于有 12 个分区,考虑其中 11 个为空,结果应该是:

(13 + 3 * 11, 9 + 5 * 11) = (46, 64)

仍然不是我们得到的。但我们可以看到,我们的计算距离成为实际结果仅差 1 (3, 5)。

{(46 + 3, 64 + 5) = (49, 69)}

我将分区数量减少到1(以确保所有数据都在同一个分区中并且没有分区为空)并执行相同的操作。结果仍然有同样的异常。

print(l.coalesce(1).aggregate((3, 5), seqOp, combOp))
>>> (16, 14)

这次我肯定期望 (13, 9),因为只有 1 个分区。但距离实际结果还是差了 1 (3, 5)

{(13 + 3, 9 + 5) = (16 , 14)}

为什么 RDD.aggregate() 的结果比第一个零值多了一个零值。 RDD 有多少个分区?另外,这个额外的 ZeroValue 在什么时候会添加到累计值中?

apache-spark pyspark bigdata rdd apache-spark-dataset
1个回答
0
投票

您正确地推断出分区数量是导致结果变化的原因。

如果您尝试运行以下代码片段,您将得到累加器的值

(13, 9)

seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
zeroValue = (3, 5)
acc = zeroValue
for num in [1, 2, 3, 4]:
    acc = seqOp(acc, num)
acc

看起来很有道理,不是吗?列表中的每个元素都会扮演

y
seqOp
的角色,因此acc的第一个元素将相当于
zeroValue[0] + sum([1, 2, 3, 4])
,即13。同时,在每一步,
zeroValue[1]
都会加 1,得到
zeroValues[1] + len([1, 2, 3, 4])
,即 9。但是请注意,我们甚至不需要定义
combOp
。 现在,假设我们只有一个分区。我们的最终结果将是 reducing 这个结果,起始值为...
zeroValue
,并带有聚合结果的函数。这个操作自然是
combOp
,我们将其定义为
(x[0] + y[0], x[1] + y[1])

我们在处理多个分区时得到您所描述的行为的原因是,reduce 部分发生在

.collect()
操作之后,其中所有结果都发送到 Spark 的驱动程序。每个空分区只有
zeroValue
,因为没有累积任何内容,而非空分区则发送
seqOp
的结果。然后,司机用
combOp
将这个结果进一步减少到
(49, 69)

© www.soinside.com 2019 - 2024. All rights reserved.