谁可以在Spark中对“combineByKey”做出明确的解释?

问题描述 投票:3回答:2

我正在学习火花,但我无法理解这个功能combineByKey

>>> data = sc.parallelize([("A",1),("A",2),("B",1),("B",2),("C",1)] )
>>> data.combineByKey(lambda v : str(v)+"_", lambda c, v : c+"@"+str(v), lambda c1, c2 : c1+c2).collect()

输出是:

[('A', '1_2_'), ('C', '1_'), ('B', '1_2_')]

首先,我很困惑:第二步@lambda c, v : c+"@"+v在哪里?我从结果中找不到@

其次,我阅读了combineByKey的函数描述,但我对算法流程感到困惑。

python apache-spark
2个回答
7
投票

groupByKey调用不会尝试合并/组合值,因此这是一项昂贵的操作。

因此,combineByKey调用就是这样的优化。当使用combineByKey时,每个分区将值合并为一个值,然后将每个分区值合并为单个值。值得注意的是,组合值的类型不必与原始值的类型匹配,并且通常不会与原始值的类型相匹配。 combineByKey函数将3个函数作为参数:

  1. 一个创建组合器的函数。在aggregateByKey函数中,第一个参数只是一个初始零值。在combineByKey中,我们提供了一个函数,它将接受当前值作为参数,并返回将与其他值合并的新值。
  2. 第二个函数是一个合并函数,它接受一个值并将其合并/组合到先前收集的值中。
  3. 第三个函数将合并的值组合在一起。基本上,此函数采用在分区级别生成的新值并将它们组合,直到我们最终得到一个奇异值。

换句话说,要理解combineByKey,考虑它如何处理它处理的每个元素是有用的。当combineByKey遍历分区中的元素时,每个元素要么具有之前未见过的键,要么具有与前一个元素相同的键。

如果它是一个新元素,combineByKey使用我们提供的函数createCombiner()来为该键创建累加器的初始值。重要的是要注意,这是在第一次在每个分区中找到密钥时发生的,而不是仅在第一次在RDD中找到密钥时发生。

如果它是我们在处理该分区时看到的值,它将改为使用提供的函数mergeValue(),该函数用于该键的累加器的当前值和新值。

由于每个分区都是独立处理的,因此我们可以为同一个密钥设置多个累加器。当我们合并每个分区的结果时,如果两个或多个分区具有相同键的累加器,我们使用用户提供的mergeCombiners()函数合并累加器。

参考文献:


1
投票

'@'仅在每个分区中添加。在您的示例中,似乎每个分区中只有一个元素。尝试:

data.combineByKey(lambda v : str(v)+"_", lambda c, v : c+"@"+str(v), lambda c1, c2 : c1+'$'+c2).collect() $

并看到差异

© www.soinside.com 2019 - 2024. All rights reserved.