SparkSQL - 延迟功能?

问题描述 投票:4回答:1

我在这个DataBricks post中看到,SparkSql中支持窗口函数,特别是我正在尝试使用lag()窗口函数。

我有一排信用卡交易,我已经对它们进行了排序,现在我想迭代这些行,并且每行显示交易金额,以及当前行金额和前一行金额的差异。

在DataBricks帖子之后,我已经提出了这个查询,但它给我一个例外,我不能完全理解为什么......

这是在PySpark中.tx是我已经在注册为临时表时创建的数据帧。

test =sqlContext.sql("SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, (lag(tx.amt) OVER (PARTITION BY tx.cc_num ORDER BY  tx.trans_date,tx.trans_time ROW BETWEEN PRECEDING AND CURRENT ROW)) as prev_amt from tx")

和异常(截断)..

py4j.protocol.Py4JJavaError: An error occurred while calling o76.sql.
: java.lang.RuntimeException: [1.67] failure: ``)'' expected but identifier OVER found

我真的很了解任何见解,这个功能相当新,而且就现有示例或其他相关帖子而言,还有很多事情要做。

编辑

我还尝试在没有SQL语句的情况下执行此操作,如下所示,但继续出错。我已经将它用于Hive和SQLContext,并收到相同的错误。

windowSpec = \
Window \
    .partitionBy(h_tx_df_ordered['cc_num']) \
    .orderBy(h_tx_df_ordered['cc_num'],h_tx_df_ordered['trans_date'],h_tx_df_ordered['trans_time'])

windowSpec.rowsBetween(-1, 0)

lag_amt = \
   (lag(h_tx_df_ordered['amt']).over(windowSpec) - h_tx_df_ordered['amt'])
    tx_df_ordered.select(
    h_tx_df_ordered['cc_num'],
    h_tx_df_ordered['trans_date'],
    h_tx_df_ordered['trans_time'],
    h_tx_df_ordered['amt'],
    lag_amt.alias("prev_amt")).show()

Traceback (most recent call last):
  File "rdd_raw_data.py", line 116, in <module>
    lag_amt.alias("prev_amt")).show()
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 721, in select
    jdf = self._jdf.select(self._jcols(*cols))
  File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o152.select.
: org.apache.spark.sql.AnalysisException: Could not resolve window function 'lag'. Note that, using window functions currently requires a HiveContext;
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
sql apache-spark pyspark apache-spark-sql window-functions
1个回答
4
投票
  1. 帧规范应该以关键字ROWS而不是ROW开头
  2. 帧规范要求下限值 ROWS BETWEEN 1 PRECEDING AND CURRENT ROW UNBOUNDED关键字 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
  3. LAG函数根本不接受框架,因此带滞后的正确SQL查询可能如下所示 SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, LAG(tx.amt) OVER ( PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time ) as prev_amt from tx

编辑:

关于SQL DSL用法:

  1. 正如您可以阅读错误消息 请注意,使用窗口函数当前需要HiveContex 一定要使用sqlContext而不是HiveContext初始化SQLContext
  2. windowSpec.rowsBetween(-1, 0)什么都不做,但lag函数不再支持帧规范。
© www.soinside.com 2019 - 2024. All rights reserved.