Spark Persist方法未被调用

问题描述 投票:0回答:1

我无法理解persist(StorageLevel.Memory_and_disk)的工作原理。我下面的代码段运行正常。

val df1= Spark.read.from.hive.table()
            // Perform a high complex calculation & Aggregation.
            .toDF()
df1.write.toAnotherHiveTable.

这大约需要1个小时才能获得140gb的数据。阶段/任务有

1.CalcStage = 750个任务(耗时50-55分钟) 2.插入到Hive阶段= 100个任务(3-4分钟)

我对修改如下内容有新要求。

val df1= Spark.read.from.hive.table()
            // Perform a high complex calculation & Aggregation.
            .toDF()


val df2=df1.filter($"exchange" === "commodities")

val finalDF = df1.join(df2)
finalDF .write.toAnotherHiveTable.

对于相同数量的数据,这大约需要1小时40分钟。而且阶段/任务有

1.CalcStage = 750个任务(耗时1小时30分钟) 2.CalcStage = 750tasks(耗时1小时30分钟)//前两个阶段开始并行运行。两个阶段的日志 从Hive表条目中读取 3.插入到Hive阶段= 100个任务(8-10分钟)

我假设由于df1和df2依赖于df1 calc逻辑,所以它会进行calc&聚合。并且我添加了df1的persist,如下所示。

val df1= Spark.read.from.hive.table()
            // Perform a high complex calculation & Aggregation.
            .toDF().persist(StorageLevel.Memory_and_disk)


val df2=df1.filter($"exchange" === "commodities")

val finalDF = df1.join(df2)
finalDF .write.toAnotherHiveTable

我以为添加persistent将有助于减少运行完整的calc / Aggregation的第二个df。但是我错了。 DAG计划和阶段日志与以前的运行相同。没有观察到变化。

我在这里想念什么吗??请帮助我理解为什么persist方法未更改任何内容。

scala apache-spark apache-spark-sql persistence cloudera
1个回答
0
投票

您需要调用一个动作n,然后保存您的第一个数据帧,然后才将计算结果保存在您的内存中>]

© www.soinside.com 2019 - 2024. All rights reserved.