我使用此代码来更新 new_df。想法是获取 date_updated 和停止时间之间的所有记录,并为它们分配一个数字,我将在后续步骤中在分组中使用该数字。所以基本上在更新日期和停止时间之间为每个组分配相同的数字。
# Create an empty DataFrame
new_df = spark.createDataFrame([], df_filtered.schema)
i = 0
# Collect rows of df_filtered_sku1 as a list
rows = df_filtered_sku1.collect()
print('Length rows')
print(len(rows)) #781
for row in rows:
sku = row['Sku']
start_time = row['DATEUPDATED']
end_time = row['stop']
print(sku, start_time, end_time)
df_temp = df_filtered.filter((df_filtered.DATEUPDATED >= start_time) & (df_filtered.DATEUPDATED <= end_time) & (df_filtered.SKU == sku))
df_temp = df_temp.withColumn("counter", lit(i))
print('Temp')
#print(df_temp.count())
# Append the temporary DataFrame to the new_df DataFrame
print('new Frame')
new_df = new_df.union(df_temp)
#print(new_df.count())
i += 1
if i > 780: print(new_df.count()) #2531
display(new_df)
df_filtered_sku1 中的行数为 781,最终 new_df 的计数为 2531。但是当我尝试显示/显示 new_df 数据帧时,它永远不会结束,并且我检查了驱动程序日志,它卡在分配失败
这不是您实际问题的答案,但这些消息本身并不能表明存在问题,无论您做什么,它们都会发生。
您可能的问题是使用collect(它将所有行带到服务器,我假设您这样做是因为您想要循环行),然后在循环中使用withColumn来获取该收集中的所有行,这同样糟糕正如您可以从查询计划的角度得到的那样。不要进行显示,而是尝试进行解释(true)并查看您要求 Spark 完成工作的嵌套级别。
我实际上不确定该代码的用途,因此我无法推荐解决方法来实现该目标,但就目前情况而言,即使代码完成,也不会执行。要验证是否尝试将行数减少到 1,运行然后添加行以查看效果。