Pyspark 转换导致内存不足问题

问题描述 投票:0回答:1

所以我有一个包含多个列的 Spark 数据框,这些列是复杂的结构。我正在尝试根据另一个结构列中的字段值转换其中一个结构列中的字段值。我正在使用

spark 3.5
,所以我使用
withField
函数。转变:

df = df.withColumn(
    "Column_1", 
    F.when(
    F.col("Column_2.field_2").isNotNull(), F.col("Column_1").withField("field_1", F.col("Column_2.field_2")*F.lit(1000000))
    ))

我正在尝试根据

Column_1.field_1
的值更新
Column_2.field_2
的值。

当我尝试在转换后显示/写入数据帧时,需要很长时间并且集群驱动程序崩溃并出现以下错误:

java.lang.OutOfMemoryError : GC overhead limit exceeded

这不是正确的使用方法吗

withField
?在计划或提供
df.explain()
的输出时,它不会给我任何错误。尽管给出
df.explain()
的输出确实需要更多的时间(50-60 秒,而其他转换链需要一两秒)。

此外,数据帧中的数据量非常低(复杂结构中存在大量空值),增量不到 100 行和几百 KB。

apache-spark pyspark databricks azure-databricks
1个回答
0
投票

尝试转换具有复杂结构列的 Spark DataFrame 时出现内存不足错误。 您看到的错误消息表明垃圾收集器花费了太多时间尝试释放内存, 当JVM没有足够的可用内存时,就会发生这种情况。

出现此错误的一个原因是分配给 Spark 驱动程序的内存量不足以处理 DataFrame 的大小。 您可以尝试通过设置spark.driver.memory配置属性来增加分配给驱动程序的内存量。 例如,您可以将其设置为 8g,为驱动程序分配8 GB内存:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("ccapp") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

正如您所提到的,您正在尝试根据 Column_2.field_2 的值更新 Column_1.field_1 的值。

我尝试了以下方法作为示例:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
struct_schema_1 = StructType([
    StructField("_1", IntegerType()),
    StructField("_2", StringType())
])
struct_schema_2 = StructType([
    StructField("_1", IntegerType()),
    StructField("_2", StringType())
])
data = [
    ((10, "value_1"), (20, "value_2")),
    ((30, "value_3"), (None, "value_4")),
    ((50, "value_5"), (60, "value_6")),
]
df = spark.createDataFrame(data, StructType([
    StructField("Column_1", struct_schema_1),
    StructField("Column_2", struct_schema_2)
]))
df = df.withColumn(
    "Column_1", 
    F.when(
        F.col("Column_2._1").isNotNull(),
        F.struct(
            F.col("Column_1._1") * F.col("Column_2._1"),
            F.col("Column_1._2")
        ).cast(struct_schema_1)
    ).otherwise(F.col("Column_1"))
)
df.show(truncate=False)

结果:

+---------------+---------------+
|Column_1       |Column_2       |
+---------------+---------------+
|{200, value_1} |{20, value_2}  |
|{30, value_3}  |{NULL, value_4}|
|{3000, value_5}|{60, value_6}  |
+---------------+---------------+
© www.soinside.com 2019 - 2024. All rights reserved.