当我迭代地重复使用旧的缓存数据时,Spark Dataframe突然变得非常慢

问题描述 投票:1回答:2

当我尝试将缓存结果保存在List中并尝试通过每次迭代中最后一个列表中的所有数据计算新DataFrame时,问题就发生了。但是,即使我使用空的DataFrame并且每次都得到一个空的结果,该函数在大约8~12轮之后会突然变得很慢。

这是我的代码

testLoop(Nil)
def testLoop(lastDfList:List[DataFrame]){      
  // do some dummy transformation like union and cache the result
  val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf)}.cache        

  // always get 0, of course  
  println(resultDf.count)  

  // benchmark action
  benchmark(resultDf.count)    

  testLoop(resultDf::lastDfList)
}

基准测试结果 1~6 round : < 200ms 7 round : 367ms 8 round : 918ms 9 round : 2476ms 10 round : 7833ms 11 round : 24231ms

我不认为GC或Block驱逐是我的问题,因为我已经使用了一个空的DataFrame,但我不知道是什么原因?我是否误解了缓存或其他什么的含义?

谢谢!


在阅读ImDarrenG的解决方案后,我将我的代码更改为以下内容:

spark.sparkContext.setCheckpointDir("/tmp")

testLoop(Nil)
def testLoop(lastDfList:List[DataFrame]){      
  // do some dummy transformation like union and cache the result
  val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf)}.cache        

  resultDf.checkpoint()  

  // always get 0, of course  
  println(resultDf.count)  

  // benchmark action
  benchmark(resultDf.count)    

  testLoop(resultDf::lastDfList)
}

但经过几次迭代后它仍然变得很慢。

caching apache-spark dataframe iteration reusability
2个回答
2
投票

在这里,您可以通过将DataFrames添加到resultDf的开头来创建lastDfList列表,并将其传递给下一个testLoop迭代:

testLoop(resultDf::lastDfList)

所以lastDfList每次传球都会变长。

这条线由DataFrameing union的每个成员创建一个新的lastDfList

val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf))}.cache 

lastDfList的每个成员都是它的前辈的联盟,因此,Spark保持一个谱系,随着testLoop的每次传递变得指数级。

我预计时间的增加是由发展议程集团的内务管理造成的。缓存数据帧不需要重复转换,但仍然必须通过spark维护谱系。

缓存数据或不,看起来你正在构建一个非常复杂的DAG,将每个DataFrametestLoop的每次传递的所有前辈联合起来。

您可以使用checkpoint修剪谱系,并引入一些检查以防止无限递归。


1
投票

根据APIcodecheckpoint将返回一个新的数据集,而不是更改原始数据集。

© www.soinside.com 2019 - 2024. All rights reserved.