如何使用RDD的persist和cache?

问题描述 投票:2回答:2

请告诉我如何使用RDD方法Persist()和Cache(),似乎对于我通常用java写的传统程序来说,比如说sparkStreaming,它是一个持续执行的DAG,每次RDD的值都会被更新,因此peristcache也会被再次调用&再次导致覆盖RDD。

但是,正如下面的文档所示,这些方法似乎只对交互式shell有用,或者说,与仅仅将所需的RDD存储在任何参考变量中相比,我是否有任何方法可以在我的顺序程序中更有效地使用缓存的persist RDD。

Spark文档链接

scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082

scala> linesWithSpark.count()
res8: Long = 19

scala> linesWithSpark.count()
res9: Long = 19

VS

在一个顺序的火花流作业中,我认为这是非常相同的,不会反复评估。

JavaRDD sortedRDD =baseRDD.filter(f(x));

sortedRDD.count();
sortedRDD.saveAsNewHadoopAPIFile();
// Or Anything we want !   

如果您能帮助我解决这个疑问,我将非常感激。

java apache-spark bigdata spark-streaming
2个回答
3
投票

Spark中最重要的功能之一是在内存中跨操作持久化(或缓存)一个数据集。当你持久化一个RDD时,每个节点都会在内存中存储它计算出的任何分区,并在该数据集(或由其派生的数据集)的其他操作中重复使用它们。这使得未来的操作速度大大加快(通常快10倍以上)。缓存是迭代算法和快速交互式使用的关键工具.你可以使用其上的persist()或cache()方法将一个RDD标记为持久化。第一次在动作中计算时,它将被保存在节点的内存中。Spark的缓存是容错的--如果一个RDD的任何分区丢失,它将使用最初创建它的变换自动重新计算。cache()方法是使用默认存储级别的简写,它是StorageLevel.MEMORY_ONLY(在内存中存储反序列化对象)。

val linesWithSpark  = sc.textFile("/home/kishore/test.txt")
linesWithSpark.cache()
linesWithSpark.count()

它什么都不做。RDD.cache也是一种懒惰操作。文件仍然没有被读取。但现在RDD说 "读取这个文件,然后缓存内容"。如果你再第一次运行 linesWithSpark.count,文件就会被加载、缓存和计数。如果你第二次调用 linesWithSpark.count,操作将使用缓存。它将只是从缓存中获取数据并计算行数。


0
投票

缓存和持久化都是用来保存Spark RDD、Dataframe和Dataset的。但是,不同的是,RDD cache() 方法默认将其保存到内存中(仅限内存)而 persist() 方法用于将其存储到用户定义的存储级别。

当你持久化一个数据集时,每个节点都会把它的分区数据存储在内存中,并在该数据集的其他操作中重复使用它们。而且Spark在节点上的持久化数据是容错的,这意味着如果一个Dataset的任何分区丢失,它将会自动使用创建它的原始变换重新计算。

Spark cache() 数据集类中的方法在内部调用 persist() 方法,而该方法又使用 SparkSession.sharedState.cacheManager.cacheQuery。 来缓存DataFrame或Dataset的结果集。

 import spark.implicits._

val columns = Seq("Seqno","Quote")
  val data = Seq(("1", "Be the change that you wish to see in the world"),
    ("2", "Everyone thinks of changing the world, but no one thinks of changing himself."),
    ("3", "The purpose of our lives is to be happy."))
  val df = data.toDF(columns:_*)

  val dfCache = df.cache()
  dfCache.show(false)

缓存 persist 有两个签名,第一个签名不接受任何参数,默认保存为 MEMORY_AND_DISK 储存层和第二个签名,该签名以 存储级别 作为一个参数来存储它到不同的存储级别。

val dfPersist = df.persist()
dfPersist.show(false)

使用第二个签名,你可以将DataFrameDataset保存到其中的一个存储层。仅限内存,仅限内存,仅限磁盘,仅限内存,仅限磁盘,仅限磁盘,仅限内存,仅限2,仅限内存,仅限磁盘,仅限磁盘。

val dfPersist = df.persist(StorageLevel.MEMORY_ONLY)
dfPersist.show(false)

如果有帮助,请告诉我。

© www.soinside.com 2019 - 2024. All rights reserved.