Pyspark 循环速度非常慢,一次又一次更新相同的数据帧

问题描述 投票:0回答:1

我想在 databricks pyspark 中实现一个逻辑,我想根据过去 14 天的更新值更新第二天的值。我正在使用循环来做到这一点。下面是代码,但速度非常慢,并且在某些点之后想要继续前进。

import time
start_time = time.time()

DateList=score_data.select("AsOfdate").distinct().orderBy('AsOfDate').rdd.flatMap(lambda x: x).collect()

score_data=score_data.withColumn('score_original',col('score')).withColumn('Amount_original',col('Amount'))

for date in DateList:
  rolling_window=date+timedelta(-14)
  print(date)
  df_current=score_data.filter(col("AsOfDate").isin(date))

  if 'rolling_median' in df_current.columns:
    df_current=df_current.drop('rolling_median')

  df_rolling = score_data.filter((col("AsOfDate")<=date) & (col("AsOfDate")>rolling_window))

  rolling_median=df_rolling.filter(col('score').isNotNull()).groupBy('id').agg(F.expr('(percentile_approx(score,0.5)+percentile_approx(score,0.50001))/2').alias('rolling_median'))

  df_current=df_current.join(rolling_median,'id','left')
  df_current=df_current.withColumn('rolling_gap',abs(col('score')-col('rolling_median')))
  df_current=df_current.withColumn('score',when(col('rolling_gap')>3,None).otherwise(col('score'))).withColumn('Amount',when(col('rolling_gap')>3,None).otherwise(col('Amount')))

  score_data=score_data.filter(~(col("AsOfDate").isin(date)))
  score_data=score_data.unionByName(df_current,allowMissingColumns=True)
print(f"Execution time: {time.time() - start_time}")  

有没有办法改进这段代码?所附数据中,绿色部分显示更新后的分数,橙色部分显示原始分数。所以,第二天,2023 年 8 月 31 日,我想使用基于更新分数(空)的中位数。

pyspark apache-spark-sql databricks vectorization aws-databricks
1个回答
0
投票

在 for 循环中对数据框进行操作(包括删除列、连接和联合等操作)都是代码性能不佳的原因。这也不是很容易理解。 相反,尝试看看您是否可以执行以下操作:

  1. 收集日期,并将过去 14 天的评分值存入字典中
  2. 将此字典传递给 UDF 函数以创建新的分数值。
  3. 在 UDF 内添加日期计算的 for 循环。
© www.soinside.com 2019 - 2024. All rights reserved.