根据条件创建列并保留先前的值

问题描述 投票:0回答:1

我具有以下按“ col1”排序的数据框。

+----+----+
|col1|col2|
+----+----+
|   a|   x|
|   a|   x|
|   a|   y|
|   b|   x|
|   b|   z|
|   c|   x|
|   c|   y|
|   d|   z|
|   d|   x|
+----+----+

我想以这样的方式添加新列,即“ col3”,即对于“ col1”中唯一组('a','b','c','d')中的每一行,如果('x'或'y')将该值加1,否则如果该值为'z'或任何其他值沿用该值。例如,在第一行中,由于col2为x,因此我们通过加0 + 1 = 1来增加1;在第二行中,由于col2再次为x,我们将增加1 +1 = 2,依此类推。对于第二组,其中col1值为b(第4行),我们从new开始,由于col2值为x,因此我们递增0 + 1 =1。在第5行中,由于col2值为z,因此我们不递增并采用先前的值,即1 对于“ d”(第8行)。因为col2值不在x或y中,所以我们不增加并保留为0。

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   a|   x|   1|
|   a|   x|   2|
|   a|   y|   3|
|   b|   x|   1|
|   b|   z|   1|
|   c|   x|   1|
|   c|   y|   2|
|   d|   z|   0|
|   d|   x|   1|
+----+----+----+

无论如何我都可以在不使用pyspark中的UDF的情况下实现此目的

apache-spark pyspark apache-spark-sql pyspark-sql pyspark-dataframes
1个回答
0
投票

使用窗口对col1进行分区,然后使用条件表达式创建新列。

from pyspark.sql.functions import *
from pyspark.sql import Window

w = Window.partitionBy("col1").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("col3", sum(when(col("col2").isin("x", "y"), 1).otherwise(0)).over(w)).orderBy("col1").show(10)

代码的结果正是您想要的。

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   a|   x|   1|
|   a|   x|   2|
|   a|   y|   3|
|   b|   x|   1|
|   b|   z|   1|
|   c|   x|   1|
|   c|   y|   2|
|   d|   z|   0|
|   d|   x|   1|
+----+----+----+
© www.soinside.com 2019 - 2024. All rights reserved.