Databricks Spark 抛出 [GC(分配失败)] 消息

问题描述 投票:0回答:1

我使用此代码来更新 new_df。想法是获取 date_updated 和停止时间之间的所有记录,并为它们分配一个数字,我将在后续步骤中在分组中使用该数字。所以基本上在更新日期和停止时间之间为每个组分配相同的数字。

# Create an empty DataFrame
new_df = spark.createDataFrame([], df_filtered.schema)
i = 0

# Collect rows of df_filtered_sku1 as a list
rows = df_filtered_sku1.collect()
print('Length rows')
print(len(rows)) #781

for row in rows:
    sku = row['Sku']
    start_time = row['DATEUPDATED']
    end_time = row['stop']
    print(sku, start_time, end_time)

    df_temp = df_filtered.filter((df_filtered.DATEUPDATED >= start_time) & (df_filtered.DATEUPDATED <= end_time) & (df_filtered.SKU == sku))
    
    df_temp = df_temp.withColumn("counter", lit(i))
    print('Temp')
    #print(df_temp.count())
    
    # Append the temporary DataFrame to the new_df DataFrame
    print('new Frame')
    new_df = new_df.union(df_temp)
    #print(new_df.count())
    
    i += 1
    if i > 780: print(new_df.count()) #2531
display(new_df)

df_filtered_sku1 中的行数为 781,最终 new_df 的计数为 2531。但是当我尝试显示/显示 new_df 数据帧时,它永远不会结束,并且我检查了驱动程序日志,它卡在分配失败

集群规格:

python apache-spark pyspark databricks
1个回答
0
投票

这不是您实际问题的答案,但这些消息本身并不能表明存在问题,无论您做什么,它们都会发生。

您可能的问题是使用collect(它将所有行带到服务器,我假设您这样做是因为您想要循环行),然后在循环中使用withColumn来获取该收集中的所有行,这同样糟糕正如您可以从查询计划的角度得到的那样。不要进行显示,而是尝试进行解释(true)并查看您要求 Spark 完成工作的嵌套级别。

我实际上不确定该代码的用途,因此我无法推荐解决方法来实现该目标,但就目前情况而言,即使代码完成,也不会执行。要验证是否尝试将行数减少到 1,运行然后添加行以查看效果。

© www.soinside.com 2019 - 2024. All rights reserved.