Spark Scala窗口扩展结果一直到最后

问题描述 投票:0回答:1

我将基于初始数据框和我要实现的问题展示我的问题:

val df_997 = Seq [(Int, Int, Int, Int)]((1,1,7,10),(1,10,4,300),(1,3,14,50),(1,20,24,70),(1,30,12,90),(2,10,4,900),(2,25,30,40),(2,15,21,60),(2,5,10,80)).toDF("policyId","FECMVTO","aux","IND_DEF").orderBy(asc("policyId"), asc("FECMVTO"))
df_997.show
+--------+-------+---+-------+
|policyId|FECMVTO|aux|IND_DEF|
+--------+-------+---+-------+
|       1|      1|  7|     10|
|       1|      3| 14|     50|
|       1|     10|  4|    300|
|       1|     20| 24|     70|
|       1|     30| 12|     90|
|       2|      5| 10|     80|
|       2|     10|  4|    900|
|       2|     15| 21|     60|
|       2|     25| 30|     40|
+--------+-------+---+-------+

想象一下,我已经按列policyId对该DF进行了分区,并基于该列创建了row_num列,以便更好地查看Windows:

val win = Window.partitionBy("policyId").orderBy("FECMVTO")

val df_998 = df_997.withColumn("row_num",row_number().over(win))
df_998.show
+--------+-------+---+-------+-------+
|policyId|FECMVTO|aux|IND_DEF|row_num|
+--------+-------+---+-------+-------+
|       1|      1|  7|     10|      1|
|       1|      3| 14|     50|      2|
|       1|     10|  4|    300|      3|
|       1|     20| 24|     70|      4|
|       1|     30| 12|     90|      5|
|       2|      5| 10|     80|      1|
|       2|     10|  4|    900|      2|
|       2|     15| 21|     60|      3|
|       2|     25| 30|     40|      4|
+--------+-------+---+-------+-------+

现在,对于每个窗口,如果aux的值为4,我希望将该寄存器的IND_DEF列的值设置为该寄存器的FEC_MVTO列,直到窗口结束。

生成的DF将是:

+--------+-------+---+-------+-------+
|policyId|FECMVTO|aux|IND_DEF|row_num|
+--------+-------+---+-------+-------+
|       1|      1|  7|     10|      1|
|       1|      3| 14|     50|      2|
|       1|    300|  4|    300|      3|
|       1|    300| 24|     70|      4|
|       1|    300| 12|     90|      5|
|       2|      5| 10|     80|      1|
|       2|    900|  4|    900|      2|
|       2|    900| 21|     60|      3|
|       2|    900| 30|     40|      4|
+--------+-------+---+-------+-------+

感谢您的建议,因为我很困在这里...

scala apache-spark window
1个回答
0
投票

这里是一种方法:首先,将left连接到具有aux == 4过滤版本的DataFrame,然后应用窗口函数first用每个分区想要的null值回填IND_DEF,最后有条件地重新创建列FECMVTO

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq(
  (1,1,7,10), (1,10,4,300), (1,3,14,50), (1,20,24,70), (1,30,12,90), 
  (2,10,4,900), (2,25,30,40), (2,15,21,60), (2,5,10,80)
).toDF("policyId","FECMVTO","aux","IND_DEF")

val win = Window.partitionBy("policyId").orderBy("FECMVTO").
  rowsBetween(Window.unboundedPreceding, 0)

val df2 = df.
  select($"policyId", $"aux", $"IND_DEF".as("IND_DEF2")).
  where($"aux" === 4)

df.join(df2, Seq("policyId", "aux"), "left_outer").
  withColumn("IND_DEF3", first($"IND_DEF2", ignoreNulls=true).over(win)).
  withColumn("FECMVTO", coalesce($"IND_DEF3", $"FECMVTO")).
  show
// +--------+---+-------+-------+--------+--------+
// |policyId|aux|FECMVTO|IND_DEF|IND_DEF2|IND_DEF3|
// +--------+---+-------+-------+--------+--------+
// |       1|  7|      1|     10|    null|    null|
// |       1| 14|      3|     50|    null|    null|
// |       1|  4|    300|    300|     300|     300|
// |       1| 24|    300|     70|    null|     300|
// |       1| 12|    300|     90|    null|     300|
// |       2| 10|      5|     80|    null|    null|
// |       2|  4|    900|    900|     900|     900|
// |       2| 21|    900|     60|    null|     900|
// |       2| 30|    900|     40|    null|     900|
// +--------+---+-------+-------+--------+--------+

IND_DEF2列,[C0列仅保留用于说明(当然可以删除)。

© www.soinside.com 2019 - 2024. All rights reserved.