AnalysisException:基于前一行计算列值时,窗口函数不支持该表达式

问题描述 投票:0回答:1

我有示例数据,该数据具有4个字段amt1,amt2,amt3和amt4。我们要基于字段总和(amt1,amt2,amt3,amt4)和上一行的amt5值来计算amt5的值。

假设下面是数据集:

+----+----+----+----+---+
|amt1|amt2|amt3|amt4|ids|
+----+----+----+----+---+
|   1|   2|   3|   4|  1|
|   1|   2|   3|   4|  2|
|   1|   2|   3|   4|  3|
|   1|   2|   3|   4|  4|
|   1|   2|   3|   4|  5|
|   1|   2|   3|   4|  6|
+----+----+----+----+---+

下面是我期望的输出:

+----+----+----+----+---+----+
|amt1|amt2|amt3|amt4|ids|amt5|
+----+----+----+----+---+----+
|   1|   2|   3|   4|  1|10  |
|   1|   2|   3|   4|  2|20  |
|   1|   2|   3|   4|  3|30  |
|   1|   2|   3|   4|  4|40  |
+----+----+----+----+---+----+

下面是执行上述代码后出现的异常:

from pyspark.sql import Row
from pyspark.sql.window import Window
import pyspark.sql.functions as func

def sum(*col):
  sum = 0
  for i in col:
    sum = sum + i
  return sum

rdd = sc.parallelize(["1,1,2,3,4", "2,1,2,3,4", "3,1,2,3,4", "4,1,2,3,4", "5,1,2,3,4", "6,1,2,3,4"])
finalRdd = rdd.map(lambda t: t.split(",")).map(lambda t: Row(ids=t[0],amt1=t[1],amt2=t[2],amt3=t[3],amt4=t[4]))
df = spark.createDataFrame(finalRdd)

w = Window.orderBy("ids").rowsBetween(
    Window.unboundedPreceding,  # Take all rows from the beginning of frame
    Window.currentRow)          # To current row

df1 = df.withColumn("amt5",sum(df.amt1,df.amt2,df.amt3,df.amt4))
df1.withColumn("amt5",sum(df1.amt5).over(w)).show()

下面是执行上述代码后出现的异常:

py4j.protocol.Py4JJavaError: An error occurred while calling o121.withColumn.
: org.apache.spark.sql.AnalysisException: Expression '(amt5#11 + cast(0 as double))' not supported within a window function.;;
Project [amt1#0, amt2#1, amt3#2, amt4#3, ids#4, amt5#11, total#19]
+- Project [amt1#0, amt2#1, amt3#2, amt4#3, ids#4, amt5#11, total#19, total#19]
   +- Window [(amt5#11 + cast(0 as double)) windowspecdefinition(ids#4 ASC NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS total#19], [ids#4 ASC NULLS FIRST]
      +- Project [amt1#0, amt2#1, amt3#2, amt4#3, ids#4, amt5#11]
         +- Project [amt1#0, amt2#1, amt3#2, amt4#3, ids#4, ((((cast(amt1#0 as double) + cast(0 as double)) + cast(amt2#1 as double)) + cast(amt3#2 as double)) + cast(amt4#3 as double)) AS amt5#11]
            +- LogicalRDD [amt1#0, amt2#1, amt3#2, amt4#3, ids#4]
python apache-spark pyspark apache-spark-sql pyspark-sql
1个回答
1
投票

您在sum功能中面临碰撞。窗口函数应来自pyspark.sql.functions包,因此应按以下方式调用它:

df1.withColumn("amt5",func.sum(df1.amt5).over(w)).show()
© www.soinside.com 2019 - 2024. All rights reserved.