在Spark中强制立即缓存的最有效方法是什么?

问题描述 投票:0回答:1

出于计时目的,我需要在执行功能之前强制执行缓存。我最初的方法是使用count()操作,因为它将像take()一样跨所有分区缓存RDD,但是在计算,通信或时间方面有没有更有效的方法来强制使用它?

// Load data, partition and mark to be cached
val data = sc.textFile("input.txt").map(_.toInt)
val partitioner = new RangePartitioner(16, data)
val partitioned_data = data.partitionBy(partitioner).cache()

// Force cache with count or something more efficient
partitioned_data.count()

// Do something
something(partitioned_data)
scala apache-spark caching rdd
1个回答
0
投票

所有这些都取决于您要做什么。如果您发现您的环境已接近持久存储的极限,我建议您使用“保存到本地”,“清除缓存”,“重新加载”和“重新缓存”技术。但是,下面我对所有简单功能进行了分类,并针对2M记录文件运行它们,以显示它们的相对运行时间进行比较。

登上领奖台就是这样:

1st(三通):Take(1),Take(1000),First;时间:9秒

第四:计数;时间:17秒

第五:收集;时间:21秒

[Disclaimer-1:是的,我知道算错了,但是我宣布它是秘密的赢家,因为它被授予了许多随意的风格点,主要是因为我认为这个答案正在慢慢变成'Whoose Line Is It Anyway'。

免责声明2:所有测试均能够使用默认的Spark内存配置运行,但collect除外,在这里我需要将其设置为大约高一个因子,并且运行时间为21秒。

如果您想在家尝试,这里是您可以运行的代码(播放老旧的游戏节目音乐):

val inputDF = spark.read.format("").load("")

var arrayOfCommand : Array[String] = Array("")
var arrayOfTime : Array[Long] = Array("0".toLong)

inputDF.count

val inputDF2 = inputDF.selectExpr("*", "'Count Run' as CommandColumn").persist

val countStartTime = System.nanoTime()
inputDF2.count

val countEndTime = System.nanoTime()
val countRunTime = (countEndTime-countStartTime)/1000000000

arrayOfCommand = Array("Count")
arrayOfTime = Array(countRunTime)

spark.catalog.clearCache
val inputDF3 = inputDF.selectExpr("*", "'Take 1 Run' as CommandColumn").persist

val takeStartTime = System.nanoTime()
inputDF3.take(1)

val takeEndTime = System.nanoTime()
val takeRunTime = (takeEndTime-takeStartTime)/1000000000

arrayOfCommand = arrayOfCommand ++ Array("Take(1)")
arrayOfTime = arrayOfTime ++ Array(takeRunTime)

spark.catalog.clearCache
val inputDF4 = inputDF.selectExpr("*", "'Take 1000 Run' as CommandColumn").persist

val takeStartTime2 = System.nanoTime()
inputDF4.take(1000)

val takeEndTime2 = System.nanoTime()
val takeRunTime2 = (takeEndTime2-takeStartTime2)/1000000000

arrayOfCommand = arrayOfCommand ++ Array("Take(1000)")
arrayOfTime = arrayOfTime ++ Array(takeRunTime)

spark.catalog.clearCache
val inputDF5 = inputDF.selectExpr("*", "'Collect Run' as CommandColumn").persist

val collectStartTime = System.nanoTime()
inputDF5.collect

val collectEndTime = System.nanoTime()
val collectRunTime = (collectEndTime-collectStartTime)/1000000000

arrayOfCommand = arrayOfCommand ++ Array("Collect")
arrayOfTime = arrayOfTime ++ Array(collectRunTime)


spark.catalog.clearCache
val inputDF6 = inputDF.selectExpr("*", "'First Run' as CommandColumn").persist

val firstStartTime = System.nanoTime()
inputDF6.first

val firstEndTime = System.nanoTime()
val firstRunTime = (firstEndTime-firstStartTime)/1000000000

arrayOfCommand = arrayOfCommand ++ Array("First")
arrayOfTime = arrayOfTime ++ Array(firstRunTime)
© www.soinside.com 2019 - 2024. All rights reserved.