这涉及一些复杂性,在某些基本知识上我可能还不清楚。去:
据我了解,spark具有“转变”和“动作”。转换懒惰地建立起您想要做什么的描述,而行动则使之成为现实。这可以提高性能(允许优化计划),或者如果您在单个数据帧上使用多个操作,则可能导致重复的工作,从而导致转换反复触发。为了避免这种情况,.cache()告诉Spark实际上“保存其工作”,因此您调用它的数据帧不应继续被重新计算。
我的问题是它似乎没有这样做。我有一个函数“ Foo”,它需要进行大量计算才能生成(非常小的)数据帧。 Foo运行很快,我可以显示结果。我还有另一个函数“ Bar”,它对数据框执行大量操作。 Bar在(大)原始输入上快速运行,但是在foo的输出上运行非常慢,甚至是缓存和合并。我还可以通过将foo的输出写入磁盘然后重新读取它来“强制”缓存,这时bar快速运行:
display(bar(bigDF)) //Fast!
val profile = foo(bigDF).coalesce(1).cache()
display(profile) //Also fast! (and profile only has 2 rows, ~80 columns)
display(bar(profile)) //Slow!
profile
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("filename.csv")
val dumb = spark.read.format("csv").option("header", "true").load("filename.csv")
display(bar(dumb)) //Fast again
对我来说,这表示.cache()不能按我认为的方式工作-慢速调用会反复重新调用foo中的转换,除非我将其写入磁盘并强制将其“忘记”它的历史。有人可以解释我所缺少的吗?
[cache
”正在按照您的期望做,似乎正在发生奇怪的事情。
我希望coalesce(1)
是问题,请尝试将其遗忘并测试其运行速度是否更快。可能是它破坏了bar
的并行性。
如果没有任何帮助,请尝试使用checkpoint
而不是cache
,这可能是查询计划很长且很复杂,checkpoint
会截断它(将其写入磁盘)
为了进行进一步的分析,您需要进入SparkUI来分析作业