我们通常使用Spark作为存储在S3或HDFS上的数据的处理引擎。我们使用Databricks和EMR平台。我经常面临的问题之一是,当任务规模增大时,工作性能会严重下降。例如,假设我从具有不同转换级别(例如过滤,爆炸,联接等)的五个表中读取数据,从这些转换中合并数据的子集,然后进行进一步处理(例如,根据需要开窗功能等),然后再进行其他一些处理阶段,最后将最终输出保存到目标s3路径。如果我们不做这项工作就需要很长时间。但是,如果我们将临时中间数据帧保存(暂存)到S3并将此保存的(在S3上)数据帧用于下一步查询,则该工作将更快地完成。有没有类似的经历?除了检查点之外,还有没有更好的方法来处理这种长任务沿袭?
更奇怪的是,对于更长的谱系,spark会抛出预期的错误,例如找不到列,而如果临时暂存中间结果,则相同的代码将起作用。
通过保存数据帧或使用检查点来写入中间数据是修复它的唯一方法。您可能会遇到一个问题,即优化器要花很长时间才能生成计划。解决此问题的最快/最有效方法是使用localCheckpoint。这将在本地实现检查点。
val df = df.localCheckpoint()