我有一个包含4列的csv。该文件包含基于系列的某些缺失行。
输入:-
No A B C
1 10 50 12
3 40 50 12
4 20 60 15
6 80 80 18
输出:-
No A B C
1 10 50 12
2 10 50 12
3 40 50 12
4 20 60 15
5 20 60 15
6 80 80 18
我需要pyspark代码来生成上述输出。
[您可以尝试一下,我基本上使用包含窗口函数lag,lead,max以及何时/否则的逻辑,以获取名为diff2的列,该列与No并排放置所有必要的间隙,然后使用序列创建这些增加的差距,然后将其爆炸以获取输出。我建议您为每个系列都有一个密钥,这样就可以对其进行分区,因为现在window的partitionby为空。
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import when
w=Window().partitionBy().orderBy("No")
w2=Window().partitionBy().orderBy("No").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("number2", F.lag("No").over(w)).withColumn("diff", F.when((F.col("number2").isNotNull()) & ((F.col("No")-F.col("number2")) > 1), (F.col("No")-F.col("number2"))).otherwise(F.lit(0)))\
.withColumn("diff2", F.lead("diff").over(w)).withColumn("diff2", F.when(F.col("diff2").isNull(), F.lit(0)).otherwise(F.col("diff2"))).withColumn("diff2", F.when(F.col("diff2")!=0, F.col("diff2")-1).otherwise(F.col("diff2"))).withColumn("max", F.max("No").over(w2))\
.withColumn("diff2", F.when((F.col("No")==F.col("max")) & (F.col("No")<F.col("max")), F.col("max")-F.col("No")).otherwise(F.col("diff2")))\
.withColumn("number2", F.when(F.col("diff2")!=0,F.expr("""sequence(No,No+diff2,1)""")).otherwise(F.expr("""sequence(No,No+diff2,0)""")))\
.drop("diff","diff2")\
.withColumn("number2", F.explode("number2")).drop("No")\
.select(F.col("number2").alias("No"), "A","B","C")\
.show()
+---+---+---+---+
| No| A| B| C|
+---+---+---+---+
| 1| 10| 50| 12|
| 2| 10| 50| 12|
| 3| 40| 50| 12|
| 4| 20| 60| 15|
| 5| 20| 60| 15|
| 6| 80| 80| 18|
+---+---+---+---+