如何计算合并的最佳numberOfPartitions?

问题描述 投票:14回答:3

所以,据我所知,一般情况下应该使用coalesce()

由于filter或其他可能导致减少原始数据集(RDD,DF)的操作,分区数量减少。 coalesce()对于在过滤大型数据集后更有效地运行操作非常有用。

我也明白它比repartition便宜,因为它只在必要时通过移动数据来减少改组。我的问题是如何定义coalesce所采用的参数(idealPartionionNo)。我正在研究一个项目,该项目是从另一位工程师传递给我的,他使用下面的计算来计算该参数的值。

// DEFINE OPTIMAL PARTITION NUMBER
implicit val NO_OF_EXECUTOR_INSTANCES = sc.getConf.getInt("spark.executor.instances", 5)
implicit val NO_OF_EXECUTOR_CORES = sc.getConf.getInt("spark.executor.cores", 2)

val idealPartionionNo = NO_OF_EXECUTOR_INSTANCES * NO_OF_EXECUTOR_CORES * REPARTITION_FACTOR

然后将其与partitioner对象一起使用:

val partitioner = new HashPartitioner(idealPartionionNo)

但也用于:

RDD.filter(x=>x._3<30).coalesce(idealPartionionNo)

这是正确的方法吗? idealPartionionNo值计算背后的主要思想是什么?什么是REPARTITION_FACTOR?我一般如何定义它?

此外,由于YARN有责任在运行中识别可用的执行器,有没有办法获得该数字(AVAILABLE_EXECUTOR_INSTANCES)并使用它来计算idealPartionionNo(即用NO_OF_EXECUTOR_INSTANCES替换AVAILABLE_EXECUTOR_INSTANCES)?

理想情况下,表单的一些实际示例:

  • 这是一个数据集(大小);
  • 这是RDD / DF的一些转换和可能的重用。
  • 这是你应该重新分配/合并的地方。
  • 假设你有n执行器与m核心和分区因子等于k

然后:

  • 理想的分区数量是==> ???

另外,如果你能引用我一个很好的博客解释这些我会非常感激。

scala apache-spark rdd
3个回答
12
投票

在实践中,最佳分区数更多地取决于您拥有的数据,您使用的转换以及可用资源的整体配置。

  • 如果分区数太少,您将遇到长GC暂停,不同类型的内存问题,以及最后的次优资源利用率。
  • 如果分区数太高,则维护成本很容易超过处理成本。此外,如果使用非分布式还原操作(如reducetreeReduce相比),则大量分区会导致驱动程序的负载增加。

您可以找到一些规则,这些规则建议超额订阅分区与核心数量(因子2或3似乎是常见的)或将分区保持在一定的大小,但这不考虑您自己的代码:

  • 如果你分配很多,你可以期待长时间的GC暂停,并且最好使用较小的分区。
  • 如果某段代码很昂贵,那么您的shuffle成本可以通过更高的并发性来分摊。
  • 如果您有过滤器,则可以根据谓词的判别力来调整分区数(如果您希望保留5%的数据和99%的数据,则做出不同的决定)。

在我看来:

  • 使用一次性作业保持较高的数字分区以保持安全(较慢比失败更好)。
  • 使用可重复使用的作业从保守配置开始然后执行 - 监视 - 调整配置 - 重复。
  • 不要尝试根据执行程序或核心数使用固定数量的分区。首先要了解您的数据和代码,然后调整配置以反映您的理解。 通常,确定群集表现出稳定行为的每个分区的原始数据量相对容易(根据我的经验,它在几百兆字节的范围内,具体取决于格式,用于加载数据的数据结构,和配置)。这是你正在寻找的“神奇数字”。

你需要记住的一些事情:

  • 分区数不一定反映数据分布。任何需要随机播放的操作(*byKeyjoinRDD.partitionByDataset.repartition)都可能导致数据分布不均匀。始终监视作业是否存在严重数据倾斜的症状。
  • 一般的分区数不是常数。任何具有多个依赖项的操作(unioncoGroupjoin)都会影响分区数。

7
投票

您的问题是有效的,但Spark分区优化完全取决于您正在运行的计算。你需要有充分的理由重新分配/合并;如果你只计算一个RDD(即使它有大量稀疏填充的分区),那么任何重新分区/合并步骤都会让你失望。

重新划分与合并

repartition(n)(与coalesce(n, shuffle = true)coalesce(n, shuffle = false)相同)之间的区别与执行模型有关.shuffle模型采用原始RDD中的每个分区,随机将其数据发送到所有执行程序,并导致新的RDD(更小) no-shuffle模型创建一个新的RDD,它将多个分区作为一个任务加载。

让我们考虑一下这个计算:

sc.textFile("massive_file.txt")
  .filter(sparseFilterFunction) // leaves only 0.1% of the lines
  .coalesce(numPartitions, shuffle = shuffle)

如果shuffletrue,则文本文件/过滤器计算发生在textFile中的默认值给出的许多任务中,并且微小的过滤结果被洗牌。如果shufflefalse,那么总任务的数量最多为numPartitions

如果numPartitions为1,则差异非常明显。 shuffle模型将并行处理和过滤数据,然后将0.1%的过滤结果发送到一个执行器以进行下游DAG操作。 no-shuffle模型将从一开始就在一个核心上处理和过滤数据。

采取的步骤

考虑下游操作。如果您只是使用此数据集一次,那么您可能根本不需要重新分区。如果要保存过滤的RDD以供以后使用(例如,到磁盘),请考虑上面的权衡。熟悉这些模型需要经验,当一个表现更好时,请尝试两种方式,看看它们的表现如何!


3
投票

正如其他人已经回答的那样,没有计算出你要求的公式。也就是说,您可以对第一部分做出有根据的猜测,然后随着时间的推移对其进行微调。

第一步是确保您有足够的分区。如果每个执行程序都有NO_OF_EXECUTOR_INSTANCES执行程序和NO_OF_EXECUTOR_CORES核心,那么您可以同时处理NO_OF_EXECUTOR_INSTANCES * NO_OF_EXECUTOR_CORES分区(每个分区将转到特定实例的特定核心)。也就是说,假设所有内容在核心之间平均分配,并且所有内容都需要完全相同的时间来处理。这种情况很少发生。由于本地性(例如,数据需要来自不同的节点),或者仅仅因为它们不平衡(例如,如果您的数据由根域分区,则包括分区在内),其中一些很可能会先于其他地方完成谷歌可能会很大)。这是REPARTITION_FACTOR发挥作用的地方。我们的想法是,我们“覆盖”每个核心,因此如果一个人完成得非常快,一个人完成得很慢,我们可以选择在他们之间划分任务。 2-3的因素通常是一个好主意。

现在让我们来看看单个分区的大小。假设您的整个数据大小为X MB,并且您有N个分区。每个分区平均为X / N MB。如果N相对于X较大,那么您可能具有非常小的平均分区大小(例如几KB)。在这种情况下,降低N通常是一个好主意,因为管理每个分区的开销变得太高。另一方面,如果大小非常大(例如几GB),那么你需要同时保存大量数据,这会导致诸如垃圾收集,高内存使用等问题。

最佳尺寸是一个很好的问题,但通常人们似乎更喜欢100-1000MB的分区,但实际上几十MB也可能是好的。

您应该注意的另一件事是计算分区的更改方式。例如,假设您从1000个分区开始,每个分区100MB,但随后过滤数据,以便每个分区变为1K,那么您可能应该合并。执行groupby或join时可能会发生类似问题。在这种情况下,分区的大小和分区的数量都会改变,并且可能达到不合需要的大小。

© www.soinside.com 2019 - 2024. All rights reserved.