我具有以下按“ col1”排序的数据框。
+----+----+
|col1|col2|
+----+----+
| a| x|
| a| x|
| a| y|
| b| x|
| b| z|
| c| x|
| c| y|
| d| z|
| d| x|
+----+----+
我想以这样的方式添加新列,即“ col3”,即对于“ col1”中唯一组('a','b','c','d')中的每一行,如果('x'或'y')将该值加1,否则如果该值为'z'或任何其他值沿用该值。例如,在第一行中,由于col2为x,因此我们通过加0 + 1 = 1来增加1;在第二行中,由于col2再次为x,我们将增加1 +1 = 2,依此类推。对于第二组,其中col1值为b(第4行),我们从new开始,由于col2值为x,因此我们递增0 + 1 =1。在第5行中,由于col2值为z,因此我们不递增并采用先前的值,即1 对于“ d”(第8行)。因为col2值不在x或y中,所以我们不增加并保留为0。
+----+----+----+
|col1|col2|col3|
+----+----+----+
| a| x| 1|
| a| x| 2|
| a| y| 3|
| b| x| 1|
| b| z| 1|
| c| x| 1|
| c| y| 2|
| d| z| 0|
| d| x| 1|
+----+----+----+
无论如何我都可以在不使用pyspark中的UDF的情况下实现此目的
使用窗口对col1
进行分区,然后使用条件表达式创建新列。
from pyspark.sql.functions import *
from pyspark.sql import Window
w = Window.partitionBy("col1").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("col3", sum(when(col("col2").isin("x", "y"), 1).otherwise(0)).over(w)).orderBy("col1").show(10)
代码的结果正是您想要的。
+----+----+----+
|col1|col2|col3|
+----+----+----+
| a| x| 1|
| a| x| 2|
| a| y| 3|
| b| x| 1|
| b| z| 1|
| c| x| 1|
| c| y| 2|
| d| z| 0|
| d| x| 1|
+----+----+----+