我正在寻找一些更好的解释python中通过spark提供的聚合功能。
我的例子如下(使用Spark 1.2.0版本的pyspark)
sc.parallelize([1,2,3,4]).aggregate(
(0, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1)),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
输出:
(10, 4)
我得到了预期的结果(10,4)
,它是1+2+3+4
和4个元素的总和。如果我从(1,0)
将传递给聚合函数的初始值更改为(0,0)
,我得到以下结果
sc.parallelize([1,2,3,4]).aggregate(
(1, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1)),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
输出:
(19, 4)
该值增加9.如果我将其更改为(2,0)
,则值将转至(28,4)
,依此类推。
有人可以向我解释这个值的计算方法吗?我预计价值会上升1而不是9,预计会看到(11,4)
而不是我看到(19,4)
。
我没有足够的声誉点来评论Maasg之前的回答。实际上零值对于seqop应该是“中性”的,这意味着它不会干扰seqop结果,如0朝向add,或1朝*;
你不应该尝试使用非中性值,因为它可能会被随意应用。此行为不仅与分区数量相关联。
我尝试了问题中所述的相同实验。使用1分区,零值应用3次。有2个分区,6次。有3个分区,9次,这将继续下去。
我对接受的答案并不完全相信,JohnKnight的答案也有所帮助,所以这是我的观点:
首先,让我用自己的话来解释aggregate():
原型:
aggregate(zeroValue,seqOp,combOp)
描述:
aggregate()
允许您获取RDD并生成与原始RDD中存储的类型不同的单个值。
参数:
zeroValue
:结果的初始化值,采用所需格式。seqOp
:您要应用于RDD记录的操作。对分区中的每个记录运行一次。combOp
:定义结果对象(每个分区一个)的组合方式。例:
计算列表的总和和该列表的长度。将结果返回到一对
(sum, length)
。
在Spark shell中,我首先创建了一个包含4个元素的列表,其中包含2个分区:
listRDD = sc.parallelize([1,2,3,4], 2)
然后我定义了我的seqOp:
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
和我的组合:
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
然后我汇总了:
listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
如您所见,我为变量提供了描述性名称,但让我进一步解释:
第一个分区有子列表[1,2]。我们将seqOp应用于该列表的每个元素,这将产生一个本地结果,一对(sum, length)
,它将在本地反映结果,仅在第一个分区中。
那么,让我们开始:local_result
被初始化为zeroValue
参数,我们提供了aggregate()
,即(0,0)和list_element
是列表的第一个元素,即1.结果这是发生的事情:
0 + 1 = 1
0 + 1 = 1
现在,本地结果是(1,1),这意味着,到目前为止,对于第一个分区,在仅处理第一个元素之后,总和为1,长度为1.注意,local_result
从(0, 0),至(1,1)。
1 + 2 = 3
1 + 1 = 2
现在本地结果是(3,2),它将是第一个分区的最终结果,因为它们不是第一个分区的子列表中的其他元素。
为第二个分区做同样的事情,得到(7,2)。
现在我们将combOp应用于每个局部结果,以便我们可以形成最终的全局结果,如下所示:(3,2) + (7,2) = (10, 4)
'figure'中描述的示例:
(0, 0) <-- zeroValue
[1, 2] [3, 4]
0 + 1 = 1 0 + 3 = 3
0 + 1 = 1 0 + 1 = 1
1 + 2 = 3 3 + 4 = 7
1 + 1 = 2 1 + 1 = 2
| |
v v
(3, 2) (7, 2)
\ /
\ /
\ /
\ /
\ /
\ /
------------
| combOp |
------------
|
v
(10, 4)
受到这个伟大的example的启发。
所以现在如果zeroValue
不是(0,0),但是(1,0),人们会期望得到(8 + 4,2 + 2)=(12,4),这并不能解释你的体验。即使我们改变了我的例子的分区数量,我也无法再次获得。
这里的关键是JohnKnight的回答,其中指出zeroValue
不仅类似于分区数量,而且可能应用的次数超出预期。
Aggregate允许您随意转换和组合RDD的值。
它使用两个功能:
第一个转换并在本地聚合[U]中添加原始集合[T]的元素,并采用以下形式:(U,T)=> U.您可以将其视为折叠,因此它也需要零对于那个操作。此操作并行地应用于每个分区。
这里是问题的关键所在:这里应该使用的唯一值是还原操作的ZERO值。此操作在每个分区上本地执行,因此,向该零值添加任何内容将添加到结果乘以RDD的分区数。
第二个操作采用前一个操作[U]的结果类型的2个值,并将其组合成一个值。此操作将减少每个分区的部分结果并生成实际总数。
例如:给定一个字符串的RDD:
val rdd:RDD[String] = ???
假设您想要该RDD中字符串长度的总和,那么您可以这样做:
1)第一个操作将字符串转换为size(int)并累积size的值。
val stringSizeCummulator: (Int, String) => Int = (total, string) => total + string.lenght`
2)为加法运算提供ZERO(0)
val ZERO = 0
3)将两个整数加在一起的操作:
val add: (Int, Int) => Int = _ + _
把它们放在一起:
rdd.aggregate(ZERO, stringSizeCummulator, add)
那么,为什么ZERO需要呢?当累加器函数应用于分区的第一个元素时,没有运行总计。 ZERO在这里使用。
例如。我的RDD是: - 分区1:[“跳转”,“结束”] - 分区2:[“the”,“wall”]
这将导致:
P1:
P2:
减少:添加(P1,P2)= 15
很棒的解释,它真的帮助我理解了聚合函数的底层工作。我玩了一段时间,发现如下。
[RDD元素之和+ acc初始值* RDD分区数+ acc初始值]
对于这里的问题,我建议检查分区,因为根据我的理解,分区的数量应该是8,因为每次我们处理RDD分区上的seq op时,它将以acc结果的初始总和开始,并且当它将进行梳状操作它将再次使用acc初始值一次。
例如清单(1,2,3,4)和加法(1,0)
通过RDD.partitions.size获取scala中的分区
如果分区为2且元素数为4则=> [10 + 1 * 2 + 1] =>(13,4)
如果分区为4且元素数为4则=> [10 + 1 * 4 + 1] =>(15,4)
希望这有帮助,你可以检查here的解释。谢谢。
您可以使用以下代码(在scala中)来准确查看aggregate
正在做什么。它构建了一个包含所有添加和合并操作的树:
sealed trait Tree[+A]
case class Leaf[A](value: A) extends Tree[A]
case class Branch[A](left: Tree[A], right: Tree[A]) extends Tree[A]
val zero : Tree[Int] = Leaf(0)
val rdd = sc.parallelize(1 to 4).repartition(3)
然后,在shell中:
scala> rdd.glom().collect()
res5: Array[Array[Int]] = Array(Array(4), Array(1, 2), Array(3))
所以,我们有这3个分区:[4],[1,2]和[3]。
scala> rdd.aggregate(zero)((l,r)=>Branch(l, Leaf(r)), (l,r)=>Branch(l,r))
res11: Tree[Int] = Branch(Branch(Branch(Leaf(0),Branch(Leaf(0),Leaf(4))),Branch(Leaf(0),Leaf(3))),Branch(Branch(Leaf(0),Leaf(1)),Leaf(2)))
您可以将结果表示为树:
+
| \__________________
+ +
| \________ | \
+ + + 2
| \ | \ | \
0 + 0 3 0 1
| \
0 4
您可以看到在驱动程序节点(树的左侧)上创建了第一个零元素,然后,所有分区的结果将逐个合并。您还会看到,如果您在问题中将0替换为1,则会在每个分区上为每个结果添加1,并且还会在驱动程序的初始值上加1。因此,您使用的零值的总时间是:
number of partitions + 1
。
所以,在你的情况下,结果
aggregate(
(X, Y),
(lambda acc, value: (acc[0] + value, acc[1] + 1)),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
将会:
(sum(elements) + (num_partitions + 1)*X, count(elements) + (num_partitions + 1)*Y)
aggregate
的实现非常简单。它在RDD.scala, line 1107中定义:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
sc.runJob(this, aggregatePartition, mergeResult)
jobResult
}
对于寻找上述示例的Scala等效代码的人 - 这里是。相同的逻辑,相同的输入/结果。
scala> val listRDD = sc.parallelize(List(1,2,3,4), 2)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:21
scala> listRDD.collect()
res7: Array[Int] = Array(1, 2, 3, 4)
scala> listRDD.aggregate((0,0))((acc, value) => (acc._1+value,acc._2+1),(acc1,acc2) => (acc1._1+acc2._1,acc1._2+acc2._2))
res10: (Int, Int) = (10,4)
我尝试了很多关于这个问题的实验。最好为聚合设置num分区。 seqOp将处理每个分区并应用初始值,而且,当组合所有分区时,combOp也将应用初始值。那么,我提出这个问题的格式:
final result = sum(list) + num_Of_Partitions * initial_Value + 1