如何检测pyspark单调减少

问题描述 投票:0回答:1

我正在使用spark DataFrame,在这里我想从特定列中检测任何值,该值不会单调减少。对于这些值,我想根据订购标准将它们替换为以前的值。

这里是一个概念性示例,如果我有一列值[65, 66, 62, 100, 40]。值“ 100”不遵循单调下降趋势,因此应替换为62。因此,结果列表将为[65, 66, 62, 62, 40]

下面是我创建的一些代码,用于检测必须替换的值,但是我不知道如何用前一个替换该值,也不知道如何忽略null中的初始lag值。

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as psf
from pyspark.sql.window import Window

sc = SparkContext(appName="sample-app")
sqlc = SQLContext(sc)

rdd = sc.parallelize([(1, 65), (2, 66), (3, 62), (4, 100), (5, 40)])
df = sqlc.createDataFrame(rdd, ["id", "value"])

window = Window.orderBy(df.id).rowsBetween(-1, -1)
sdf = df.withColumn(
    "__monotonic_col",
    (df.value <= psf.lag(df.value, 1).over(window)) & df.value.isNotNull(),
)


sdf.show()

此代码产生以下输出:

+---+-----+---------------+
| id|value|__monotonic_col|
+---+-----+---------------+
|  1|   65|           null|
|  2|   66|          false|
|  3|   62|           true|
|  4|  100|          false|
|  5|   40|           true|
+---+-----+---------------+
python apache-spark pyspark
1个回答
0
投票

首先,如果我的理解是正确的,难道不应该将66替换为65),因为它没有遵循下降趋势吗?

如果这是正确的解释,那么下面的方法应该起作用(我添加了一个额外的列以使事情保持整洁,但您可以将所有内容包装到一个单独的列创建语句中:]

from pyspark.sql import functions as F

sdf = sdf.withColumn(
    "__monotonic_col_value",
    F.when(
        F.col("__monotonic_col")  | F.col("__monotonic_col").isNull(), df.value)
    .otherwise(
        psf.lag(df.value, 1).over(window)
    ),
)
© www.soinside.com 2019 - 2024. All rights reserved.