我正在使用 6 个工作线程运行 PySpark 作业,在此作业期间,我的所有转换都在多个工作线程中执行,但是当我尝试将输出保存为镶木地板文件时,我可以从 Spark UI 中看到仅使用 1 个工作线程执行器,这使得它与其他转换相比非常慢,我尝试了不同的方法,更改重新分区,使用 .persist() 保存到内存中,但没有任何改变,这是我的代码:
self.data = self.data.repartition(6)
self.data.write.parquet(self.output_local_path + "/data", mode="overwrite", compression="snappy")
不要介意执行时间,我只是尝试使用一些小文件来调试它,因为我的实际数据转换大约需要 2-3 分钟,但写入 parquet 可能需要大约 20 分钟,这确实太多了。 (大约 70M 行 x 60 列)
你们能帮我确定我在这里缺少什么吗?谢谢!
修复了它,问题并不完全来自 .write 函数,在这个过程中我正在做一个倾斜的外连接,为了解决这个问题,这是我使用的代码:
from pyspark.sql.functions import broadcast
spark_df.join(broadcast(df_rates),...)