我想在 databricks pyspark 中实现一个逻辑,我想根据过去 14 天的更新值更新第二天的值。我正在使用循环来做到这一点。下面是代码,但速度非常慢,并且在某些点之后想要继续前进。
import time
start_time = time.time()
DateList=score_data.select("AsOfdate").distinct().orderBy('AsOfDate').rdd.flatMap(lambda x: x).collect()
score_data=score_data.withColumn('score_original',col('score')).withColumn('Amount_original',col('Amount'))
for date in DateList:
rolling_window=date+timedelta(-14)
print(date)
df_current=score_data.filter(col("AsOfDate").isin(date))
if 'rolling_median' in df_current.columns:
df_current=df_current.drop('rolling_median')
df_rolling = score_data.filter((col("AsOfDate")<=date) & (col("AsOfDate")>rolling_window))
rolling_median=df_rolling.filter(col('score').isNotNull()).groupBy('id').agg(F.expr('(percentile_approx(score,0.5)+percentile_approx(score,0.50001))/2').alias('rolling_median'))
df_current=df_current.join(rolling_median,'id','left')
df_current=df_current.withColumn('rolling_gap',abs(col('score')-col('rolling_median')))
df_current=df_current.withColumn('score',when(col('rolling_gap')>3,None).otherwise(col('score'))).withColumn('Amount',when(col('rolling_gap')>3,None).otherwise(col('Amount')))
score_data=score_data.filter(~(col("AsOfDate").isin(date)))
score_data=score_data.unionByName(df_current,allowMissingColumns=True)
print(f"Execution time: {time.time() - start_time}")
有没有办法改进这段代码?所附数据中,绿色部分显示更新后的分数,橙色部分显示原始分数。所以,第二天,2023 年 8 月 31 日,我想使用基于更新分数(空)的中位数。
在 for 循环中对数据框进行操作(包括删除列、连接和联合等操作)都是代码性能不佳的原因。这也不是很容易理解。 相反,尝试看看您是否可以执行以下操作: