Scala Spark / Databricks:.cache()不阻止重新计算

问题描述 投票:0回答:1

这涉及一些复杂性,在某些基本知识上我可能还不清楚。去:

据我了解,spark具有“转变”和“动作”。转换懒惰地建立起您想要做什么的描述,而行动则使之成为现实。这可以提高性能(允许优化计划),或者如果您在单个数据帧上使用多个操作,则可能导致重复的工作,从而导致转换反复触发。为了避免这种情况,.cache()告诉Spark实际上“保存其工作”,因此您调用它的数据帧不应继续被重新计算。

我的问题是它似乎没有这样做。我有一个函数“ Foo”,它需要进行大量计算才能生成(非常小的)数据帧。 Foo运行很快,我可以显示结果。我还有另一个函数“ Bar”,它对数据框执行大量操作。 Bar在(大)原始输入上快速运行,但是在foo的输出上运行非常慢,甚至是缓存和合并。我还可以通过将foo的输出写入磁盘然后重新读取它来“强制”缓存,这时bar快速运行:

display(bar(bigDF)) //Fast!

val profile = foo(bigDF).coalesce(1).cache()
display(profile) //Also fast! (and profile only has 2 rows, ~80 columns)

display(bar(profile)) //Slow!

profile
  .write.format("com.databricks.spark.csv")
  .option("header", "true")
  .save("filename.csv")
val dumb = spark.read.format("csv").option("header", "true").load("filename.csv")
display(bar(dumb)) //Fast again

对我来说,这表示.cache()不能按我认为的方式工作-慢速调用会反复重新调用foo中的转换,除非我将其写入磁盘并强制将其“忘记”它的历史。有人可以解释我所缺少的吗?

scala apache-spark databricks
1个回答
0
投票

[cache”正在按照您的期望做,似乎正在发生奇怪的事情。

我希望coalesce(1)是问题,请尝试将其遗忘并测试其运行速度是否更快。可能是它破坏了bar的并行性。

如果没有任何帮助,请尝试使用checkpoint而不是cache,这可能是查询计划很长且很复杂,checkpoint会截断它(将其写入磁盘)

为了进行进一步的分析,您需要进入SparkUI来分析作业

© www.soinside.com 2019 - 2024. All rights reserved.