如何计算给定数据的5天平均值,10天平均值和15天平均值?

问题描述 投票:-2回答:1

场景:

我有以下数据框如下

```     -- -----------------------------------
        companyId | calc_date   | mean   |
        ----------------------------------
        1111      | 01-08-2002  |  15    |
        ----------------------------------
        1111      | 02-08-2002  |  16.5   |
        ----------------------------------
        1111      | 03-08-2002  |  17     |
        ----------------------------------
        1111      | 04-08-2002  |  15     |
        ----------------------------------
        1111      | 05-08-2002  |  23     |
        ----------------------------------
        1111      | 06-08-2002  |  22.6   |
        ----------------------------------
        1111      | 07-08-2002  |  25     | 
        ----------------------------------
        1111      | 08-08-2002  |  15     |
        ----------------------------------
        1111      | 09-08-2002  |  15     |
        ----------------------------------
        1111      | 10-08-2002  |  16.5   |
        ----------------------------------
        1111      | 11-08-2002  |  22.6   |
        ----------------------------------
        1111      | 12-08-2002  |  15     |
        ----------------------------------
        1111      | 13-08-2002  |  16.5   |
        ----------------------------------
        1111      | 14-08-2002  |  25     |
        ----------------------------------
        1111      | 15-08-2002  |  16.5   |
        ----------------------------------

```

要求:

需要计算每个公司的每个记录的5天平均值,10天平均值,15天平均值。

5 day-mean   -->  Previous 5 days available mean sum
10 day-mean  --> Previous 10 days available mean sum
15 day-mean  --> Previous 15 days available mean sum

结果数据框应该具有如下所示的calulated列

        ----------------------------------------------------------------------------
        companyId | calc_date   | mean   |  5 day-mean | 10-day mean | 15-day mean |
        ----------------------------------------------------------------------------

题 : 怎么做到这一点?在火花中做到这一点的最佳方法是什么?

scala apache-spark apache-spark-sql spark-streaming
1个回答
2
投票

这是一种使用公司的Window分区通过n-day mean计算指定时间戳范围内当前行和前一行之间的rangeBetween的方法,如下所示(使用虚拟数据集):

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val df = (1 to 3).flatMap(i => Seq.tabulate(15)(j => (i, s"${j+1}-2-2019", j+1))).
  toDF("company_id", "calc_date", "mean")

df.show
// +----------+---------+----+
// |company_id|calc_date|mean|
// +----------+---------+----+
// |         1| 1-2-2019|   1|
// |         1| 2-2-2019|   2|
// |         1| 3-2-2019|   3|
// |         1| 4-2-2019|   4|
// |         1| 5-2-2019|   5|
// |         ...             |
// |         1|14-2-2019|  14|
// |         1|15-2-2019|  15|
// |         2| 1-2-2019|   1|
// |         2| 2-2-2019|   2|
// |         2| 3-2-2019|   3|
// |         ...             |
// +----------+---------+----+

def winSpec = Window.partitionBy("company_id").orderBy("ts")
def dayRange(days: Int) = winSpec.rangeBetween(-(days * 24 * 60 * 60), 0)

df.
  withColumn("ts", unix_timestamp(to_date($"calc_date", "d-M-yyyy"))).
  withColumn("mean-5", mean($"mean").over(dayRange(5))).
  withColumn("mean-10", mean($"mean").over(dayRange(10))).
  withColumn("mean-15", mean($"mean").over(dayRange(15))).
  show
// +----------+---------+----+----------+------+-------+-------+
// |company_id|calc_date|mean|        ts|mean-5|mean-10|mean-15|
// +----------+---------+----+----------+------+-------+-------+
// |         1| 1-2-2019|   1|1549008000|   1.0|    1.0|    1.0|
// |         1| 2-2-2019|   2|1549094400|   1.5|    1.5|    1.5|
// |         1| 3-2-2019|   3|1549180800|   2.0|    2.0|    2.0|
// |         1| 4-2-2019|   4|1549267200|   2.5|    2.5|    2.5|
// |         1| 5-2-2019|   5|1549353600|   3.0|    3.0|    3.0|
// |         1| 6-2-2019|   6|1549440000|   3.5|    3.5|    3.5|
// |         1| 7-2-2019|   7|1549526400|   4.5|    4.0|    4.0|
// |         1| 8-2-2019|   8|1549612800|   5.5|    4.5|    4.5|
// |         1| 9-2-2019|   9|1549699200|   6.5|    5.0|    5.0|
// |         1|10-2-2019|  10|1549785600|   7.5|    5.5|    5.5|
// |         1|11-2-2019|  11|1549872000|   8.5|    6.0|    6.0|
// |         1|12-2-2019|  12|1549958400|   9.5|    7.0|    6.5|
// |         1|13-2-2019|  13|1550044800|  10.5|    8.0|    7.0|
// |         1|14-2-2019|  14|1550131200|  11.5|    9.0|    7.5|
// |         1|15-2-2019|  15|1550217600|  12.5|   10.0|    8.0|
// |         3| 1-2-2019|   1|1549008000|   1.0|    1.0|    1.0|
// |         3| 2-2-2019|   2|1549094400|   1.5|    1.5|    1.5|
// |         3| 3-2-2019|   3|1549180800|   2.0|    2.0|    2.0|
// |         3| 4-2-2019|   4|1549267200|   2.5|    2.5|    2.5|
// |         3| 5-2-2019|   5|1549353600|   3.0|    3.0|    3.0|
// +----------+---------+----+----------+------+-------+-------+
// only showing top 20 rows

请注意,如果日期保证是连续的每日时间序列,则可以直接在rowsBetween上使用rangeBetween(而不是calc_date)。

© www.soinside.com 2019 - 2024. All rights reserved.