Pyspark 函数减去之前的行

问题描述 投票:0回答:1

在有条件的情况下将滞后/Windows 函数应用于整个数据帧时遇到问题。

我想从第 2 周开始用当前行值 (value2) 减去之前的行值 (value1)。

这是我的数据:

'''

from pyspark.sql import functions as f

data = [
    (1, 1, 1),
    (2, 0, 5),
    (3, 0, 10),
    (4, 0, 20),
    (5, 0, 30),
    (6, 0, 40)
]
columns = ["week", "value1", "value2"]
df = spark.createDataFrame(data, columns)

'''

这是我进行计算的逻辑:

'''

w=Window.orderBy("week")
                          
df2 = df.withColumn('value1',
                    f.when((f.col('week') > 1),
                           f.lag(df['value1']).over(w) - df['value2'] 
                          ).otherwise(
                        f.col('value1')
                    )
                   )

'''

这不适用于整个数据框

有人可以帮我吗?

pyspark
1个回答
0
投票

我认为这可以解决您的问题:

from pyspark.sql import functions as f
from pyspark.sql.window import Window

data = [
    (1, 1, 1),
    (2, 0, 5),
    (3, 0, 10),
    (4, 0, 20),
    (5, 0, 30),
    (6, 0, 40)
]
columns = ["week", "value1", "value2"]
df = spark.createDataFrame(data, columns)

w = Window.orderBy("week")

df2 = df.withColumn("value1",
    f.when(f.col("week") > 1,
        f.lag("value1").over(w) - f.col("value2")
    ).otherwise(f.col("value1"))
)
© www.soinside.com 2019 - 2024. All rights reserved.