最近我看到了Spark的一些奇怪行为。
我在我的应用程序中有一个管道,我正在操作一个大数据集 - 伪代码:
val data = spark.read (...)
data.join(df1, "key") //etc, more transformations
data.cache(); // used to not recalculate data after save
data.write.parquet() // some save
val extension = data.join (..) // more transformations - joins, selects, etc.
extension.cache(); // again, cache to not double calculations
extension.count();
// (1)
extension.write.csv() // some other save
extension.groupBy("key").agg(some aggregations) //
extension.write.parquet() // other save, without cache it will trigger recomputation of whole dataset
然而,当我调用data.unpersist()
即就地(1)
时,Spark会从存储中删除所有数据集,也就是extension
数据集,它不是我试图解除的数据集。
这是预期的行为吗?如何在旧数据集中释放unpersist
的内存,而不是没有“链接下一个”的所有数据集?
我的设置:
问题看起来类似于Understanding Spark's caching,但在这里我在做一些行动之前没有人。起初我在计算所有内容然后保存到存储中 - 我不知道缓存在RDD中是否与数据集中的相同
解答Spark 2.4:
有关于数据集和缓存行为的正确性的票证,请参阅https://issues.apache.org/jira/browse/SPARK-24596
从Maryann Xue的描述来看,现在缓存将以下列方式工作:
“常规模式”意味着问题和@Avishek的答案和非级联模式意味着什么,extension
将不会是无人问津的