它看起来像窗口函数上的
median
不支持,但不知何故错误消息没有明确说明这一点。还有另一种方法可以计算滚动窗口的中位数吗?
import pyspark # 3.4.1
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[
(4, 5),
(3, 10),
(2, 15),
(1, 20)
],
('id1', 'v1')
)
df.createOrReplaceTempView("x")
spark.sql('select median(v1) over (order by id1 rows between 2 preceding and current row) as v1 from x').collect()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/jan/.local/lib/python3.8/site-packages/pyspark/sql/session.py", line 1440, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
File "/home/jan/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/home/jan/.local/lib/python3.8/site-packages/pyspark/errors/exceptions/captured.py", line 175, in deco
raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: Cannot specify order by or frame for 'median'.; line 1 pos 7;
Project [v1#4]
+- Project [v1#1L, id1#0L, v1#4, v1#4]
+- Window [median(v1#1L) windowspecdefinition(id1#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -2, currentrow$())) AS v1#4], [id1#0L ASC NULLS FIRST]
+- Project [v1#1L, id1#0L]
+- SubqueryAlias x
+- View (`x`, [id1#0L,v1#1L])
+- LogicalRDD [id1#0L, v1#1L], false
删除
order by id1
,这显然是不可取的,但没有任何区别。
不直接支持在 PySpark 中计算窗口上的中位数。 主要原因是中位数需要对数据进行排序,而排序是不可并行的操作,导致在 Spark 等分布式环境中计算效率低下。
但是,有一些解决方法可以实现这一目标。您可以使用值为 0.5 的percentile_approx,这是中位数的近似版本:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[
(4, 5),
(3, 10),
(2, 15),
(1, 20)
],
('id1', 'v1')
)
window = Window.orderBy('id1').rowsBetween(-2, 0)
df = df.withColumn('median_approx', F.expr('percentile_approx(v1, 0.5)') over window)
df.show()
这将计算每行窗口上的近似中位数。请注意,结果是近似值,但出于许多实际目的,它应该足够接近。
PySpark 的窗口操作不支持需要对窗口数据进行全排序或完整扫描的操作,例如中位数或其他基于排名的统计数据。这些操作不容易并行化,并且需要对数据进行混洗才能将窗口的所有数据放在一起,这在 Spark 等分布式环境中可能是一项成本高昂的操作。即使使用 Vectorized/PANDAS_UDF,您也会面临同样的问题。