我的sparkDF.persist(DISK_ONLY)数据存储在哪里?

问题描述 投票:0回答:2

我想更多地了解一下spark中hadoop的持久化策略。

当我使用 DISK_ONLY 策略保留数据帧时,我的数据存储在哪里(路径/文件夹...)?我在哪里指定这个位置?

scala apache-spark hadoop persist
2个回答
10
投票

对于简短的答案,我们可以看一下关于 spark.local.dir

文档

用于 Spark 中“临时”空间的目录,包括映射输出文件和存储在磁盘上的 RDD。它应该位于系统中的快速本地磁盘上。它还可以是不同磁盘上的多个目录的逗号分隔列表。注意:在 Spark 1.0 及更高版本中,这将被集群管理器设置的 SPARK_LOCAL_DIRS(独立、Mesos)或 LOCAL_DIRS (YARN) 环境变量覆盖。

为了更深入地理解,我们可以查看代码:

DataFrame
(只是一个
Dataset[Row]
)基于
RDD
,并且利用相同的持久化机制。
RDD
将其委托给
SparkContext
,这将其标记为持久化。然后,该任务实际上由
org.apache.spark.storage
包中的几个类来处理:首先,
BlockManager
只是管理要持久化的数据块以及如何执行此操作的策略,将实际持久化委托给
DiskStore
(当然,当在磁盘上写入时)它代表用于写入的高级接口,并且又具有用于更底层操作的
DiskBlockManager

希望您了解现在应该在哪里查看,以便我们可以继续并了解数据实际保存在哪里以及我们如何配置它:

DiskBlockManager
调用帮助器
Utils.getConfiguredLocalDirs
,这是出于实用性我将在这里复制(取自链接的 2.2.1 版本,撰写本文时的最新版本):

def getConfiguredLocalDirs(conf: SparkConf): Array[String] = {
    val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
    if (isRunningInYarnContainer(conf)) {
        // If we are in yarn mode, systems can have different disk layouts so we must set it
        // to what Yarn on this system said was available. Note this assumes that Yarn has
        // created the directories already, and that they are secured so that only the
        // user has access to them.
        getYarnLocalDirs(conf).split(",")
    } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
        conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
    } else if (conf.getenv("SPARK_LOCAL_DIRS") != null) {
        conf.getenv("SPARK_LOCAL_DIRS").split(",")
    } else if (conf.getenv("MESOS_DIRECTORY") != null && !shuffleServiceEnabled) {
        // Mesos already creates a directory per Mesos task. Spark should use that directory
        // instead so all temporary files are automatically cleaned up when the Mesos task ends.
        // Note that we don't want this if the shuffle service is enabled because we want to
        // continue to serve shuffle files after the executors that wrote them have already exited.
        Array(conf.getenv("MESOS_DIRECTORY"))
    } else {
        if (conf.getenv("MESOS_DIRECTORY") != null && shuffleServiceEnabled) {
        logInfo("MESOS_DIRECTORY available but not using provided Mesos sandbox because " +
            "spark.shuffle.service.enabled is enabled.")
        }
        // In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
        // configuration to point to a secure directory. So create a subdirectory with restricted
        // permissions under each listed directory.
        conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(",")
    }
}

我相信,代码是非常不言自明的,并且有很好的注释(并且与文档的内容完全匹配):在 Yarn 上运行时,有一个依赖于 Yarn 容器存储的特定策略,在 Mesos 中,它要么使用Mesos 沙箱(除非启用了 shuffle 服务),在所有其他情况下,它将转到

spark.local.dir
java.io.tmpdir
(可能是
/tmp/
)下设置的位置。

所以,如果您只是玩玩,数据很可能存储在

/tmp/
下,否则这在很大程度上取决于您的环境和配置。


4
投票

总结一下我的 YARN 环境:

在 @stefanobaghino 的指导下,我能够在加载纱线配置的代码中更进一步。

val localDirs = Option(conf.getenv("LOCAL_DIRS")).getOrElse("")

yarn-default.xml 中的 yarn.nodemanager.local-dirs

选项中设置

我的问题的背景是,由错误引起的

2018-01-23 16:57:35,229 WARN org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection: Directory /data/1/yarn/local error, used space above threshold of 98.5%, removing from list of valid directories

我的 Spark 作业有时会被终止,我想了解在运行作业时该磁盘是否也用于我的持久数据(实际上数量很大)。

事实证明,这正是使用磁盘策略保存数据时数据所在的文件夹。

非常感谢您在此问题上提供的所有有用指导!

© www.soinside.com 2019 - 2024. All rights reserved.