考虑到内存有限,我感觉火花会自动从每个节点中删除RDD。我想知道这个时间是可配置的吗? spark如何决定何时从内存中驱逐RDD
注意:我不是在谈论rdd.cache()
我想知道这个时间是可配置的吗? spark如何决定何时从内存中驱逐RDD
RDD
就像其他任何物体一样。如果你不持久/缓存它,它将作为托管语言下的任何其他对象,并且一旦没有指向它的活动根对象就被收集。
@Jacek指出的“如何”部分是一个名为ContextCleaner
的对象的责任。主要是,如果你想要细节,this is what the cleaning method looks like:
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop()
synchronized {
reference.foreach { ref =>
logDebug("Got cleaning task " + ref.task)
referenceBuffer.remove(ref)
ref.task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
}
}
}
} catch {
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError("Error in cleaning thread", e)
}
}
}
如果你想了解更多,我建议浏览Sparks源码甚至更好,阅读@Jacek书名为"Mastering Apache Spark"(This points to an explanation regarding ContextCleaner
)
一般来说,这就是Yuval Itzchakov wrote“就像任何其他物体一样”,但......(总是“但是”,不是吗?)
在Spark中,由于我们有shuffle块(在Spark管理的其他块中),因此它并不那么明显。它们由运行在执行程序上的BlockManagers管理。当驱动程序上的对象被驱逐出内存时,它们必须以某种方式得到通知,对吧?
这就是ContextCleaner进入舞台的地方。它是Spark应用程序的垃圾收集器,负责应用程序范围内的shuffle,RDD,广播,累加器和检查点RDD的清理,旨在减少长时间运行的数据繁重的Spark应用程序的内存需求。
ContextCleaner在驱动程序上运行。它是在SparkContext
启动时创建并立即启动的(启用了spark.cleaner.referenceTracking
Spark属性,默认情况下是这样)。当SparkContext
停止时它停止。
您可以通过使用jconsole
或jstack
转储Spark应用程序中的所有线程来查看它。 ContextCleaner使用守护进程Spark Context Cleaner线程来清除RDD,shuffle和广播状态。
您还可以通过为INFO
记录器启用DEBUG
或org.apache.spark.ContextCleaner
日志记录级别来查看其工作。只需将以下行添加到conf/log4j.properties
:
log4j.logger.org.apache.spark.ContextCleaner=DEBUG
测量GC的影响
GC调优的第一步是收集有关垃圾收集发生频率和GC使用时间的统计信息。这可以通过将-verbose:gc -XX:+ PrintGCDetails -XX:+ PrintGCTimeStamps添加到Java选项来完成。 (有关将Java选项传递给Spark作业的信息,请参阅配置指南。)下次运行Spark作业时,每次发生垃圾收集时,您都会看到工作日志中打印的消息。请注意,这些日志将位于群集的工作节点上(位于其工作目录中的stdout文件中),而不是位于驱动程序上。
高级GC调整
为了进一步调整垃圾收集,我们首先需要了解JVM中有关内存管理的一些基本信息:
Java堆空间分为Young和Old两个区域。 Young代表意味着持有短命的物体,而老一代则用于生命周期较长的物体。
年轻一代进一步分为三个区域[Eden,Survivor,Survivor 2]。
垃圾收集过程的简化描述:当Eden已满时,在Eden上运行次要GC,并将从Eden和Survivor1中存活的对象复制到Survivor2。幸存者地区被交换。如果一个对象足够大或Survivor2已满,则将其移至Old。最后,当Old接近满时,将调用完整的GC。
根据Resilient Distributed Data-set论文 -
我们的工作节点将内存中的RDD分区缓存为Java对象。我们在RDD级别使用LRU替换策略(即,我们不从RDD中逐出分区以便从同一RDD加载其他分区),因为大多数操作都是扫描。到目前为止,我们发现这个简单的策略适用于我们所有的用户应用程序。需要更多控制的程序员也可以为每个RDD设置保留优先级作为缓存的参数。