我们有一个在 Spark 集群工作线程上进行计算的工作负载(CPU 密集型)。 结果被拉回驱动程序,该驱动程序拥有大量内存分配来通过 RDD .collect() 收集结果 然后对结果进行进一步处理,生成 pandas 数据框(预先存在的包逻辑,无法更改)。
然后需要将该 pandas 数据框存储到 databricks 中。 这是通过将 pandas 数据帧转换为 Spark 数据帧,然后调用 .saveAsTable 来完成的。
问题:对于一个有 900 列、只有 50k 行的表,将 pandas 数据帧转换为 Spark 数据帧需要 5 分钟。 我在谷歌上进行了大量搜索,我能收集到的唯一信息是 pandas 数据帧到 Spark 数据帧的转换会导致集群工作人员之间的数据自动分区。我相信这可能就是它如此慢的原因,因为将数据从 Spark 数据帧实际保存到 databricks 表的步骤只需要 10 秒(相比之下)。
有没有办法强制 Spark DataFrame 不分布/并行化?我希望它只是保留在驱动程序上,这样我就可以使用它很好的写入 api 来保存到 databricks 中。
我尝试设置:spark.default.parallelism 1来强制它不分配工作负载,但它似乎仍然将数据发送给工作人员,而不是留在驱动程序上(我可能是错的,但这就是它看起来的样子就像日志中一样)。
spark数据帧上有repartition()和coalesce()的方法,但是这些方法在数据帧创建之后应用而不是之前,这违背了在创建spark数据帧时试图节省时间的目的。
有什么想法吗?
delta_frame.write \
.mode("append") \
.option("delta.columnMapping.mode", "name") \
.option("mergeSchema", "true" if merge_schema else "false") \
.option("path", target_path) \
.partitionBy(partition_cols) \
.saveAsTable(full_table_name)
看来我的问题的前提有些错误。
pyspark dataframe 的整个概念与 pyspark RDD(弹性分布式数据集)直接相关。所以两者缺一不可。 无论您是否喜欢,pyspark dataframe 都会创建一个 RDD,这意味着它将花费大量开销将数据移至 JVM。
除非集群启用了箭头(默认情况下禁用),否则将 pandas 数据帧转换为 pyspark 数据帧的性能非常差。在我的实例中,超过 400 秒,而在集群上启用箭头时为 25 秒。 因此,我的答案是将其添加到集群配置中:
spark.sql.execution.arrow.pyspark.enable true
也可以在笔记本中完成:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
仅使用驱动程序将 pandas 数据帧转换为 pyspark 数据帧的整个想法很差,因为转换过程本身可以利用集群工作线程上的 cpu 来利用 arrow 加速到 Spark 数据帧的转换。