我正在使用spark DataFrame
,在这里我想从特定列中检测任何值,该值不会单调减少。对于这些值,我想根据订购标准将它们替换为以前的值。
这里是一个概念性示例,如果我有一列值[65, 66, 62, 100, 40]
。值“ 100”不遵循单调下降趋势,因此应替换为62。因此,结果列表将为[65, 66, 62, 62, 40]
。
下面是我创建的一些代码,用于检测必须替换的值,但是我不知道如何用前一个替换该值,也不知道如何忽略null
中的初始lag
值。
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as psf
from pyspark.sql.window import Window
sc = SparkContext(appName="sample-app")
sqlc = SQLContext(sc)
rdd = sc.parallelize([(1, 65), (2, 66), (3, 62), (4, 100), (5, 40)])
df = sqlc.createDataFrame(rdd, ["id", "value"])
window = Window.orderBy(df.id).rowsBetween(-1, -1)
sdf = df.withColumn(
"__monotonic_col",
(df.value <= psf.lag(df.value, 1).over(window)) & df.value.isNotNull(),
)
sdf.show()
此代码产生以下输出:
+---+-----+---------------+
| id|value|__monotonic_col|
+---+-----+---------------+
| 1| 65| null|
| 2| 66| false|
| 3| 62| true|
| 4| 100| false|
| 5| 40| true|
+---+-----+---------------+
首先,如果我的理解是正确的,难道不应该将66替换为65),因为它没有遵循下降趋势吗?
如果这是正确的解释,那么下面的方法应该起作用(我添加了一个额外的列以使事情保持整洁,但您可以将所有内容包装到一个单独的列创建语句中:]
from pyspark.sql import functions as F
sdf = sdf.withColumn(
"__monotonic_col_value",
F.when(
F.col("__monotonic_col") | F.col("__monotonic_col").isNull(), df.value)
.otherwise(
psf.lag(df.value, 1).over(window)
),
)