停止 Spark 数据帧分发到集群 - 它需要保留在驱动程序上

问题描述 投票:0回答:1

我们有一个在 Spark 集群工作线程上进行计算的工作负载(CPU 密集型)。 结果被拉回驱动程序,该驱动程序拥有大量内存分配来通过 RDD .collect() 收集结果 然后对结果进行进一步处理,生成 pandas 数据框(预先存在的包逻辑,无法更改)。

然后需要将该 pandas 数据框存储到 databricks 中。 这是通过将 pandas 数据帧转换为 Spark 数据帧,然后调用 .saveAsTable 来完成的。

问题:对于一个有 900 列、只有 50k 行的表,将 pandas 数据帧转换为 Spark 数据帧需要 5 分钟。 我在谷歌上进行了大量搜索,我能收集到的唯一信息是 pandas 数据帧到 Spark 数据帧的转换会导致集群工作人员之间的数据自动分区。我相信这可能就是它如此慢的原因,因为将数据从 Spark 数据帧实际保存到 databricks 表的步骤只需要 10 秒(相比之下)。

有没有办法强制 Spark DataFrame 不分布/并行化?我希望它只是保留在驱动程序上,这样我就可以使用它很好的写入 api 来保存到 databricks 中。

我尝试设置:spark.default.parallelism 1来强制它不分配工作负载,但它似乎仍然将数据发送给工作人员,而不是留在驱动程序上(我可能是错的,但这就是它看起来的样子就像日志中一样)。

spark数据帧上有repartition()和coalesce()的方法,但是这些方法在数据帧创建之后应用而不是之前,这违背了在创建spark数据帧时试图节省时间的目的。

有什么想法吗?

delta_frame.write \
    .mode("append") \
    .option("delta.columnMapping.mode", "name") \
    .option("mergeSchema", "true" if merge_schema else "false") \
    .option("path", target_path) \
    .partitionBy(partition_cols) \
    .saveAsTable(full_table_name)
python pyspark apache-spark-sql databricks rdd
1个回答
0
投票

看来我的问题的前提有些错误。

pyspark dataframe 的整个概念与 pyspark RDD(弹性分布式数据集)直接相关。所以两者缺一不可。 无论您是否喜欢,pyspark dataframe 都会创建一个 RDD,这意味着它将花费大量开销将数据移至 JVM。

除非集群启用了箭头(默认情况下禁用),否则将 pandas 数据帧转换为 pyspark 数据帧的性能非常差。在我的实例中,超过 400 秒,而在集群上启用箭头时为 25 秒。 因此,我的答案是将其添加到集群配置中:

spark.sql.execution.arrow.pyspark.enable true

也可以在笔记本中完成:

启用基于箭头的列式数据传输

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

仅使用驱动程序将 pandas 数据帧转换为 pyspark 数据帧的整个想法很差,因为转换过程本身可以利用集群工作线程上的 cpu 来利用 arrow 加速到 Spark 数据帧的转换。

© www.soinside.com 2019 - 2024. All rights reserved.