解释Spark中的聚合功能

问题描述 投票:43回答:8

我正在寻找一些更好的解释python中通过spark提供的聚合功能。

我的例子如下(使用Spark 1.2.0版本的pyspark)

sc.parallelize([1,2,3,4]).aggregate(
  (0, 0),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

输出:

(10, 4)

我得到了预期的结果(10,4),它是1+2+3+4和4个元素的总和。如果我从(1,0)将传递给聚合函数的初始值更改为(0,0),我得到以下结果

sc.parallelize([1,2,3,4]).aggregate(
    (1, 0),
    (lambda acc, value: (acc[0] + value, acc[1] + 1)),
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

输出:

(19, 4)

该值增加9.如果我将其更改为(2,0),则值将转至(28,4),依此类推。

有人可以向我解释这个值的计算方法吗?我预计价值会上升1而不是9,预计会看到(11,4)而不是我看到(19,4)

python apache-spark lambda aggregate rdd
8个回答
15
投票

我没有足够的声誉点来评论Maasg之前的回答。实际上零值对于seqop应该是“中性”的,这意味着它不会干扰seqop结果,如0朝向add,或1朝*;

你不应该尝试使用非中性值,因为它可能会被随意应用。此行为不仅与分区数量相关联。

我尝试了问题中所述的相同实验。使用1分区,零值应用3次。有2个分区,6次。有3个分区,9次,这将继续下去。


76
投票

我对接受的答案并不完全相信,JohnKnight的答案也有所帮助,所以这是我的观点:

首先,让我用自己的话来解释aggregate()

原型:

aggregate(zeroValue,seqOp,combOp)

描述:

aggregate()允许您获取RDD并生成与原始RDD中存储的类型不同的单个值。

参数:

  1. zeroValue:结果的初始化值,采用所需格式。
  2. seqOp:您要应用于RDD记录的操作。对分区中的每个记录运行一次。
  3. combOp:定义结果对象(每个分区一个)的组合方式。

例:

计算列表的总和和该列表的长度。将结果返回到一对(sum, length)

在Spark shell中,我首先创建了一个包含4个元素的列表,其中包含2个分区:

listRDD = sc.parallelize([1,2,3,4], 2)

然后我定义了我的seqOp:

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )

和我的组合:

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )

然后我汇总了:

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)

如您所见,我为变量提供了描述性名称,但让我进一步解释:

第一个分区有子列表[1,2]。我们将seqOp应用于该列表的每个元素,这将产生一个本地结果,一对(sum, length),它将在本地反映结果,仅在第一个分区中。

那么,让我们开始:local_result被初始化为zeroValue参数,我们提供了aggregate(),即(0,0)和list_element是列表的第一个元素,即1.结果这是发生的事情:

0 + 1 = 1
0 + 1 = 1

现在,本地结果是(1,1),这意味着,到目前为止,对于第一个分区,在仅处理第一个元素之后,总和为1,长度为1.注意,local_result从(0, 0),至(1,1)。

1 + 2 = 3
1 + 1 = 2

现在本地结果是(3,2),它将是第一个分区的最终结果,因为它们不是第一个分区的子列表中的其他元素。

为第二个分区做同样的事情,得到(7,2)。

现在我们将combOp应用于每个局部结果,以便我们可以形成最终的全局结果,如下所示:(3,2) + (7,2) = (10, 4)


'figure'中描述的示例:

            (0, 0) <-- zeroValue

[1, 2]                  [3, 4]

0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1

1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)

受到这个伟大的example的启发。


所以现在如果zeroValue不是(0,0),但是(1,0),人们会期望得到(8 + 4,2 + 2)=(12,4),这并不能解释你的体验。即使我们改变了我的例子的分区数量,我也无法再次获得。

这里的关键是JohnKnight的回答,其中指出zeroValue不仅类似于分区数量,而且可能应用的次数超出预期。


28
投票

Aggregate允许您随意转换和组合RDD的值。

它使用两个功能:

第一个转换并在本地聚合[U]中添加原始集合[T]的元素,并采用以下形式:(U,T)=> U.您可以将其视为折叠,因此它也需要零对于那个操作。此操作并行地应用于每个分区。

这里是问题的关键所在:这里应该使用的唯一值是还原操作的ZERO值。此操作在每个分区上本地执行,因此,向该零值添加任何内容将添加到结果乘以RDD的分区数。

第二个操作采用前一个操作[U]的结果类型的2个值,并将其组合成一个值。此操作将减少每个分区的部分结果并生成实际总数。

例如:给定一个字符串的RDD:

val rdd:RDD[String] = ???

假设您想要该RDD中字符串长度的总和,那么您可以这样做:

1)第一个操作将字符串转换为size(int)并累积size的值。

val stringSizeCummulator: (Int, String) => Int  = (total, string) => total + string.lenght`

2)为加法运算提供ZERO(0)

val ZERO = 0

3)将两个整数加在一起的操作:

val add: (Int, Int) => Int = _ + _

把它们放在一起:

rdd.aggregate(ZERO, stringSizeCummulator, add)

那么,为什么ZERO需要呢?当累加器函数应用于分区的第一个元素时,没有运行总计。 ZERO在这里使用。

例如。我的RDD是: - 分区1:[“跳转”,“结束”] - 分区2:[“the”,“wall”]

这将导致:

P1:

  1. stringSizeCummulator(ZERO,“Jump”)= 4
  2. stringSizeCummulator(4,“over”)= 8

P2:

  1. stringSizeCummulator(ZERO,“the”)= 3
  2. stringSizeCummulator(3,“wall”)= 7

减少:添加(P1,P2)= 15


1
投票

很棒的解释,它真的帮助我理解了聚合函数的底层工作。我玩了一段时间,发现如下。

  • 如果你使用acc为(0,0)那么它不会改变函数输出的结果。
  • 如果初始累加器被更改,那么它将处理结果,如下所示

[RDD元素之和+ acc初始值* RDD分区数+ acc初始值]

对于这里的问题,我建议检查分区,因为根据我的理解,分区的数量应该是8,因为每次我们处理RDD分区上的seq op时,它将以acc结果的初始总和开始,并且当它将进行梳状操作它将再次使用acc初始值一次。

例如清单(1,2,3,4)和加法(1,0)

通过RDD.partitions.size获取scala中的分区

如果分区为2且元素数为4则=> [10 + 1 * 2 + 1] =>(13,4)

如果分区为4且元素数为4则=> [10 + 1 * 4 + 1] =>(15,4)

希望这有帮助,你可以检查here的解释。谢谢。


1
投票

您可以使用以下代码(在scala中)来准确查看aggregate正在做什么。它构建了一个包含所有添加和合并操作的树:

sealed trait Tree[+A]
case class Leaf[A](value: A) extends Tree[A]
case class Branch[A](left: Tree[A], right: Tree[A]) extends Tree[A]

val zero : Tree[Int] = Leaf(0)
val rdd = sc.parallelize(1 to 4).repartition(3)

然后,在shell中:

scala> rdd.glom().collect()
res5: Array[Array[Int]] = Array(Array(4), Array(1, 2), Array(3))

所以,我们有这3个分区:[4],[1,2]和[3]。

scala> rdd.aggregate(zero)((l,r)=>Branch(l, Leaf(r)), (l,r)=>Branch(l,r))
res11: Tree[Int] = Branch(Branch(Branch(Leaf(0),Branch(Leaf(0),Leaf(4))),Branch(Leaf(0),Leaf(3))),Branch(Branch(Leaf(0),Leaf(1)),Leaf(2)))

您可以将结果表示为树:

+
| \__________________
+                    +
| \________          | \
+          +         +   2
| \        | \       | \         
0  +       0  3      0  1
   | \
   0  4

您可以看到在驱动程序节点(树的左侧)上创建了第一个零元素,然后,所有分区的结果将逐个合并。您还会看到,如果您在问题中将0替换为1,则会在每个分区上为每个结果添加1,并且还会在驱动程序的初始值上加1。因此,您使用的零值的总时间是:

number of partitions + 1

所以,在你的情况下,结果

aggregate(
  (X, Y),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

将会:

(sum(elements) + (num_partitions + 1)*X, count(elements) + (num_partitions + 1)*Y)

aggregate的实现非常简单。它在RDD.scala, line 1107中定义:

  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
    sc.runJob(this, aggregatePartition, mergeResult)
    jobResult
}

0
投票

对于寻找上述示例的Scala等效代码的人 - 这里是。相同的逻辑,相同的输入/结果。

scala> val listRDD = sc.parallelize(List(1,2,3,4), 2)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:21

scala> listRDD.collect()
res7: Array[Int] = Array(1, 2, 3, 4)

scala> listRDD.aggregate((0,0))((acc, value) => (acc._1+value,acc._2+1),(acc1,acc2) => (acc1._1+acc2._1,acc1._2+acc2._2))
res10: (Int, Int) = (10,4)

0
投票

我尝试了很多关于这个问题的实验。最好为聚合设置num分区。 seqOp将处理每个分区并应用初始值,而且,当组合所有分区时,combOp也将应用初始值。那么,我提出这个问题的格式:

final result = sum(list) + num_Of_Partitions * initial_Value + 1

0
投票

感谢gsamaras。

我的观点如下,enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.