RDD在内存中保留多长时间?

问题描述 投票:6回答:4

考虑到内存有限,我感觉火花会自动从每个节点中删除RDD。我想知道这个时间是可配置的吗? spark如何决定何时从内存中驱逐RDD

注意:我不是在谈论rdd.cache()

apache-spark rdd
4个回答
7
投票

我想知道这个时间是可配置的吗? spark如何决定何时从内存中驱逐RDD

RDD就像其他任何物体一样。如果你不持久/缓存它,它将作为托管语言下的任何其他对象,并且一旦没有指向它的活动根对象就被收集。

@Jacek指出的“如何”部分是一个名为ContextCleaner的对象的责任。主要是,如果你想要细节,this is what the cleaning method looks like

private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
  while (!stopped) {
    try {
      val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
          .map(_.asInstanceOf[CleanupTaskWeakReference])
      // Synchronize here to avoid being interrupted on stop()
      synchronized {
        reference.foreach { ref =>
          logDebug("Got cleaning task " + ref.task)
          referenceBuffer.remove(ref)
          ref.task match {
            case CleanRDD(rddId) =>
              doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
            case CleanShuffle(shuffleId) =>
              doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
            case CleanBroadcast(broadcastId) =>
              doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
            case CleanAccum(accId) =>
              doCleanupAccum(accId, blocking = blockOnCleanupTasks)
            case CleanCheckpoint(rddId) =>
              doCleanCheckpoint(rddId)
            }
          }
        }
      } catch {
        case ie: InterruptedException if stopped => // ignore
        case e: Exception => logError("Error in cleaning thread", e)
    }
  }
}

如果你想了解更多,我建议浏览Sparks源码甚至更好,阅读@Jacek书名为"Mastering Apache Spark"This points to an explanation regarding ContextCleaner


5
投票

一般来说,这就是Yuval Itzchakov wrote“就像任何其他物体一样”,但......(总是“但是”,不是吗?)

在Spark中,由于我们有shuffle块(在Spark管理的其他块中),因此它并不那么明显。它们由运行在执行程序上的BlockManagers管理。当驱动程序上的对象被驱逐出内存时,它们必须以某种方式得到通知,对吧?

这就是ContextCleaner进入舞台的地方。它是Spark应用程序的垃圾收集器,负责应用程序范围内的shuffle,RDD,广播,累加器和检查点RDD的清理,旨在减少长时间运行的数据繁重的Spark应用程序的内存需求。

ContextCleaner在驱动程序上运行。它是在SparkContext启动时创建并立即启动的(启用了spark.cleaner.referenceTracking Spark属性,默认情况下是这样)。当SparkContext停止时它停止。

您可以通过使用jconsolejstack转储Spark应用程序中的所有线程来查看它。 ContextCleaner使用守护进程Spark Context Cleaner线程来清除RDD,shuffle和广播状态。

您还可以通过为INFO记录器启用DEBUGorg.apache.spark.ContextCleaner日志记录级别来查看其工作。只需将以下行添加到conf/log4j.properties

log4j.logger.org.apache.spark.ContextCleaner=DEBUG

1
投票

测量GC的影响

GC调优的第一步是收集有关垃圾收集发生频率和GC使用时间的统计信息。这可以通过将-verbose:gc -XX:+ PrintGCDetails -XX:+ PrintGCTimeStamps添加到Java选项来完成。 (有关将Java选项传递给Spark作业的信息,请参阅配置指南。)下次运行Spark作业时,每次发生垃圾收集时,您都会看到工作日志中打印的消息。请注意,这些日志将位于群集的工作节点上(位于其工作目录中的stdout文件中),而不是位于驱动程序上。

高级GC调整

为了进一步调整垃圾收集,我们首先需要了解JVM中有关内存管理的一些基本信息:

Java堆空间分为Young和Old两个区域。 Young代表意味着持有短命的物体,而老一代则用于生命周期较长的物体。

年轻一代进一步分为三个区域[Eden,Survivor,Survivor 2]。

垃圾收集过程的简化描述:当Eden已满时,在Eden上运行次要GC,并将从Eden和Survivor1中存活的对象复制到Survivor2。幸存者地区被交换。如果一个对象足够大或Survivor2已满,则将其移至Old。最后,当Old接近满时,将调用完整的GC。


1
投票

根据Resilient Distributed Data-set论文 -

我们的工作节点将内存中的RDD分区缓存为Java对象。我们在RDD级别使用LRU替换策略(即,我们不从RDD中逐出分区以便从同一RDD加载其他分区),因为大多数操作都是扫描。到目前为止,我们发现这个简单的策略适用于我们所有的用户应用程序。需要更多控制的程序员也可以为每个RDD设置保留优先级作为缓存的参数。

© www.soinside.com 2019 - 2024. All rights reserved.