在 Azure DataBricks(12.2 LTS(包括 Apache Spark 3.3.2、Scala 2.12)中,我在计算 Standard_DS5_v2 的笔记本中运行以下代码:
df_spark = spark.sql(my_qyery_to_delta_table)
df_pandas = df_spark.toPandas()
df_spark.unpersist()
import gc
del df_pandas
gc.collect()
数据获取需要几分钟才能执行(数据量很大,例如列中包含大型嵌套 json)。但是我观察到在此过程(屏幕)之后内存没有释放。如何解决这个问题以及这个问题的根源是什么?
[编辑]使用 df.unpersist() 的方法也没有帮助。
toPandas()
方法返回 Pandas DataFrame,该数据帧存储在驱动程序的内存中
节点。当您删除 df 变量并运行
gc.collect()
时,您仅释放 Pandas DataFrame 使用的内存,而不是之前创建的 Spark DataFrame 使用的内存。
要释放 Spark DataFrame 使用的内存,可以在使用完 DataFrame 后调用 DataFrame 上的
unpersist()
方法。
我创建了一个例子:
delta_df = spark.read.format("delta").load(delta_table_path)
df_pandas = delta_df.toPandas()
df_filtered.unpersist()
gc.collect()
print(df_pandas.head())
结果:
The count of filtered rows is: 500
id name
0 751 Name_751
1 752 Name_752
2 753 Name_753
3 754 Name_754
4 755 Name_755
通过调用
df_filtered.unpersist()
,我正在释放 Spark DataFrame 使用的内存。
此方法应该可以帮助您减少驱动程序节点上的内存使用量并防止内存不足错误。
在上面的代码中将 Spark DataFrame 转换为 Pandas DataFrame 使用 Unpersist 或删除 Spark DataFrame。 这一步对于释放Spark端资源很重要 您可以使用
df_filtered.unpersist()
或 df_filtered = None
然后显式调用垃圾收集,然后您可以根据需要继续使用 df_pandas