不支持中值窗函数?

问题描述 投票:0回答:1

它看起来像窗口函数上的

median
不支持,但不知何故错误消息没有明确说明这一点。还有另一种方法可以计算滚动窗口的中位数吗?

import pyspark # 3.4.1
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
    [
        (4, 5),
        (3, 10),
        (2, 15),
        (1, 20)
    ],
    ('id1', 'v1')
)
df.createOrReplaceTempView("x")
spark.sql('select median(v1) over (order by id1 rows between 2 preceding and current row) as v1 from x').collect()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/jan/.local/lib/python3.8/site-packages/pyspark/sql/session.py", line 1440, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
  File "/home/jan/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/home/jan/.local/lib/python3.8/site-packages/pyspark/errors/exceptions/captured.py", line 175, in deco
    raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: Cannot specify order by or frame for 'median'.; line 1 pos 7;
Project [v1#4]
+- Project [v1#1L, id1#0L, v1#4, v1#4]
   +- Window [median(v1#1L) windowspecdefinition(id1#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -2, currentrow$())) AS v1#4], [id1#0L ASC NULLS FIRST]
      +- Project [v1#1L, id1#0L]
         +- SubqueryAlias x
            +- View (`x`, [id1#0L,v1#1L])
               +- LogicalRDD [id1#0L, v1#1L], false

删除

order by id1
,这显然是不可取的,但没有任何区别。

apache-spark pyspark window-functions
1个回答
0
投票

不直接支持在 PySpark 中计算窗口上的中位数。 主要原因是中位数需要对数据进行排序,而排序是不可并行的操作,导致在 Spark 等分布式环境中计算效率低下。

但是,有一些解决方法可以实现这一目标。您可以使用值为 0.5 的percentile_approx,这是中位数的近似版本:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
    [
        (4, 5),
        (3, 10),
        (2, 15),
        (1, 20)
    ],
    ('id1', 'v1')
)


window = Window.orderBy('id1').rowsBetween(-2, 0)

df = df.withColumn('median_approx', F.expr('percentile_approx(v1, 0.5)') over window)
df.show()

这将计算每行窗口上的近似中位数。请注意,结果是近似值,但出于许多实际目的,它应该足够接近。

PySpark 的窗口操作不支持需要对窗口数据进行全排序或完整扫描的操作,例如中位数或其他基于排名的统计数据。这些操作不容易并行化,并且需要对数据进行混洗才能将窗口的所有数据放在一起,这在 Spark 等分布式环境中可能是一项成本高昂的操作。即使使用 Vectorized/PANDAS_UDF,您也会面临同样的问题。

© www.soinside.com 2019 - 2024. All rights reserved.