根据序列填充缺失值,并根据pyspark中的上一行或下一行填充第二行

问题描述 投票:0回答:1

我有一个包含4列的csv。该文件包含基于系列的某些缺失行。

输入:-

No  A   B   C
1   10  50  12
3   40  50  12
4   20  60  15
6   80  80  18

输出:-

No  A   B   C
1   10  50  12
2   10  50  12
3   40  50  12
4   20  60  15
5   20  60  15
6   80  80  18

我需要pyspark代码来生成上述输出。

pyspark pyspark-sql pyspark-dataframes
1个回答
0
投票

[您可以尝试一下,我基本上使用包含窗口函数lag,lead,max以及何时/否则的逻辑,以获取名为diff2的列,该列与No并排放置所有必要的间隙,然后使用序列创建这些增加的差距,然后将其爆炸以获取输出。我建议您为每个系列都有一个密钥,这样就可以对其进行分区,因为现在window的partitionby为空。

from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import when
w=Window().partitionBy().orderBy("No")
w2=Window().partitionBy().orderBy("No").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("number2", F.lag("No").over(w)).withColumn("diff", F.when((F.col("number2").isNotNull()) & ((F.col("No")-F.col("number2")) > 1), (F.col("No")-F.col("number2"))).otherwise(F.lit(0)))\
.withColumn("diff2", F.lead("diff").over(w)).withColumn("diff2", F.when(F.col("diff2").isNull(), F.lit(0)).otherwise(F.col("diff2"))).withColumn("diff2", F.when(F.col("diff2")!=0, F.col("diff2")-1).otherwise(F.col("diff2"))).withColumn("max", F.max("No").over(w2))\
.withColumn("diff2", F.when((F.col("No")==F.col("max")) & (F.col("No")<F.col("max")), F.col("max")-F.col("No")).otherwise(F.col("diff2")))\
.withColumn("number2", F.when(F.col("diff2")!=0,F.expr("""sequence(No,No+diff2,1)""")).otherwise(F.expr("""sequence(No,No+diff2,0)""")))\
.drop("diff","diff2")\
.withColumn("number2", F.explode("number2")).drop("No")\
.select(F.col("number2").alias("No"), "A","B","C")\
.show()


+---+---+---+---+
| No|  A|  B|  C|
+---+---+---+---+
|  1| 10| 50| 12|
|  2| 10| 50| 12|
|  3| 40| 50| 12|
|  4| 20| 60| 15|
|  5| 20| 60| 15|
|  6| 80| 80| 18|
+---+---+---+---+
© www.soinside.com 2019 - 2024. All rights reserved.