所以,据我所知,一般情况下应该使用coalesce()
:
由于
filter
或其他可能导致减少原始数据集(RDD,DF)的操作,分区数量减少。coalesce()
对于在过滤大型数据集后更有效地运行操作非常有用。
我也明白它比repartition
便宜,因为它只在必要时通过移动数据来减少改组。我的问题是如何定义coalesce
所采用的参数(idealPartionionNo
)。我正在研究一个项目,该项目是从另一位工程师传递给我的,他使用下面的计算来计算该参数的值。
// DEFINE OPTIMAL PARTITION NUMBER
implicit val NO_OF_EXECUTOR_INSTANCES = sc.getConf.getInt("spark.executor.instances", 5)
implicit val NO_OF_EXECUTOR_CORES = sc.getConf.getInt("spark.executor.cores", 2)
val idealPartionionNo = NO_OF_EXECUTOR_INSTANCES * NO_OF_EXECUTOR_CORES * REPARTITION_FACTOR
然后将其与partitioner
对象一起使用:
val partitioner = new HashPartitioner(idealPartionionNo)
但也用于:
RDD.filter(x=>x._3<30).coalesce(idealPartionionNo)
这是正确的方法吗? idealPartionionNo
值计算背后的主要思想是什么?什么是REPARTITION_FACTOR
?我一般如何定义它?
此外,由于YARN有责任在运行中识别可用的执行器,有没有办法获得该数字(AVAILABLE_EXECUTOR_INSTANCES
)并使用它来计算idealPartionionNo
(即用NO_OF_EXECUTOR_INSTANCES
替换AVAILABLE_EXECUTOR_INSTANCES
)?
理想情况下,表单的一些实际示例:
n
执行器与m
核心和分区因子等于k
然后:
另外,如果你能引用我一个很好的博客解释这些我会非常感激。
在实践中,最佳分区数更多地取决于您拥有的数据,您使用的转换以及可用资源的整体配置。
reduce
与treeReduce
相比),则大量分区会导致驱动程序的负载增加。您可以找到一些规则,这些规则建议超额订阅分区与核心数量(因子2或3似乎是常见的)或将分区保持在一定的大小,但这不考虑您自己的代码:
在我看来:
你需要记住的一些事情:
*byKey
,join
,RDD.partitionBy
,Dataset.repartition
)都可能导致数据分布不均匀。始终监视作业是否存在严重数据倾斜的症状。union
,coGroup
,join
)都会影响分区数。您的问题是有效的,但Spark分区优化完全取决于您正在运行的计算。你需要有充分的理由重新分配/合并;如果你只计算一个RDD(即使它有大量稀疏填充的分区),那么任何重新分区/合并步骤都会让你失望。
repartition(n)
(与coalesce(n, shuffle = true)
和coalesce(n, shuffle = false)
相同)之间的区别与执行模型有关.shuffle模型采用原始RDD中的每个分区,随机将其数据发送到所有执行程序,并导致新的RDD(更小) no-shuffle模型创建一个新的RDD,它将多个分区作为一个任务加载。
让我们考虑一下这个计算:
sc.textFile("massive_file.txt")
.filter(sparseFilterFunction) // leaves only 0.1% of the lines
.coalesce(numPartitions, shuffle = shuffle)
如果shuffle
是true
,则文本文件/过滤器计算发生在textFile
中的默认值给出的许多任务中,并且微小的过滤结果被洗牌。如果shuffle
是false
,那么总任务的数量最多为numPartitions
。
如果numPartitions
为1,则差异非常明显。 shuffle模型将并行处理和过滤数据,然后将0.1%的过滤结果发送到一个执行器以进行下游DAG操作。 no-shuffle模型将从一开始就在一个核心上处理和过滤数据。
考虑下游操作。如果您只是使用此数据集一次,那么您可能根本不需要重新分区。如果要保存过滤的RDD以供以后使用(例如,到磁盘),请考虑上面的权衡。熟悉这些模型需要经验,当一个表现更好时,请尝试两种方式,看看它们的表现如何!
正如其他人已经回答的那样,没有计算出你要求的公式。也就是说,您可以对第一部分做出有根据的猜测,然后随着时间的推移对其进行微调。
第一步是确保您有足够的分区。如果每个执行程序都有NO_OF_EXECUTOR_INSTANCES执行程序和NO_OF_EXECUTOR_CORES核心,那么您可以同时处理NO_OF_EXECUTOR_INSTANCES * NO_OF_EXECUTOR_CORES分区(每个分区将转到特定实例的特定核心)。也就是说,假设所有内容在核心之间平均分配,并且所有内容都需要完全相同的时间来处理。这种情况很少发生。由于本地性(例如,数据需要来自不同的节点),或者仅仅因为它们不平衡(例如,如果您的数据由根域分区,则包括分区在内),其中一些很可能会先于其他地方完成谷歌可能会很大)。这是REPARTITION_FACTOR发挥作用的地方。我们的想法是,我们“覆盖”每个核心,因此如果一个人完成得非常快,一个人完成得很慢,我们可以选择在他们之间划分任务。 2-3的因素通常是一个好主意。
现在让我们来看看单个分区的大小。假设您的整个数据大小为X MB,并且您有N个分区。每个分区平均为X / N MB。如果N相对于X较大,那么您可能具有非常小的平均分区大小(例如几KB)。在这种情况下,降低N通常是一个好主意,因为管理每个分区的开销变得太高。另一方面,如果大小非常大(例如几GB),那么你需要同时保存大量数据,这会导致诸如垃圾收集,高内存使用等问题。
最佳尺寸是一个很好的问题,但通常人们似乎更喜欢100-1000MB的分区,但实际上几十MB也可能是好的。
您应该注意的另一件事是计算分区的更改方式。例如,假设您从1000个分区开始,每个分区100MB,但随后过滤数据,以便每个分区变为1K,那么您可能应该合并。执行groupby或join时可能会发生类似问题。在这种情况下,分区的大小和分区的数量都会改变,并且可能达到不合需要的大小。