场景:
我有以下数据框如下
``` -- -----------------------------------
companyId | calc_date | mean |
----------------------------------
1111 | 01-08-2002 | 15 |
----------------------------------
1111 | 02-08-2002 | 16.5 |
----------------------------------
1111 | 03-08-2002 | 17 |
----------------------------------
1111 | 04-08-2002 | 15 |
----------------------------------
1111 | 05-08-2002 | 23 |
----------------------------------
1111 | 06-08-2002 | 22.6 |
----------------------------------
1111 | 07-08-2002 | 25 |
----------------------------------
1111 | 08-08-2002 | 15 |
----------------------------------
1111 | 09-08-2002 | 15 |
----------------------------------
1111 | 10-08-2002 | 16.5 |
----------------------------------
1111 | 11-08-2002 | 22.6 |
----------------------------------
1111 | 12-08-2002 | 15 |
----------------------------------
1111 | 13-08-2002 | 16.5 |
----------------------------------
1111 | 14-08-2002 | 25 |
----------------------------------
1111 | 15-08-2002 | 16.5 |
----------------------------------
```
要求:
需要计算每个公司的每个记录的5天平均值,10天平均值,15天平均值。
5 day-mean --> Previous 5 days available mean sum
10 day-mean --> Previous 10 days available mean sum
15 day-mean --> Previous 15 days available mean sum
结果数据框应该具有如下所示的calulated列
----------------------------------------------------------------------------
companyId | calc_date | mean | 5 day-mean | 10-day mean | 15-day mean |
----------------------------------------------------------------------------
题 : 怎么做到这一点?在火花中做到这一点的最佳方法是什么?
这是一种使用公司的Window分区通过n-day mean
计算指定时间戳范围内当前行和前一行之间的rangeBetween
的方法,如下所示(使用虚拟数据集):
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
val df = (1 to 3).flatMap(i => Seq.tabulate(15)(j => (i, s"${j+1}-2-2019", j+1))).
toDF("company_id", "calc_date", "mean")
df.show
// +----------+---------+----+
// |company_id|calc_date|mean|
// +----------+---------+----+
// | 1| 1-2-2019| 1|
// | 1| 2-2-2019| 2|
// | 1| 3-2-2019| 3|
// | 1| 4-2-2019| 4|
// | 1| 5-2-2019| 5|
// | ... |
// | 1|14-2-2019| 14|
// | 1|15-2-2019| 15|
// | 2| 1-2-2019| 1|
// | 2| 2-2-2019| 2|
// | 2| 3-2-2019| 3|
// | ... |
// +----------+---------+----+
def winSpec = Window.partitionBy("company_id").orderBy("ts")
def dayRange(days: Int) = winSpec.rangeBetween(-(days * 24 * 60 * 60), 0)
df.
withColumn("ts", unix_timestamp(to_date($"calc_date", "d-M-yyyy"))).
withColumn("mean-5", mean($"mean").over(dayRange(5))).
withColumn("mean-10", mean($"mean").over(dayRange(10))).
withColumn("mean-15", mean($"mean").over(dayRange(15))).
show
// +----------+---------+----+----------+------+-------+-------+
// |company_id|calc_date|mean| ts|mean-5|mean-10|mean-15|
// +----------+---------+----+----------+------+-------+-------+
// | 1| 1-2-2019| 1|1549008000| 1.0| 1.0| 1.0|
// | 1| 2-2-2019| 2|1549094400| 1.5| 1.5| 1.5|
// | 1| 3-2-2019| 3|1549180800| 2.0| 2.0| 2.0|
// | 1| 4-2-2019| 4|1549267200| 2.5| 2.5| 2.5|
// | 1| 5-2-2019| 5|1549353600| 3.0| 3.0| 3.0|
// | 1| 6-2-2019| 6|1549440000| 3.5| 3.5| 3.5|
// | 1| 7-2-2019| 7|1549526400| 4.5| 4.0| 4.0|
// | 1| 8-2-2019| 8|1549612800| 5.5| 4.5| 4.5|
// | 1| 9-2-2019| 9|1549699200| 6.5| 5.0| 5.0|
// | 1|10-2-2019| 10|1549785600| 7.5| 5.5| 5.5|
// | 1|11-2-2019| 11|1549872000| 8.5| 6.0| 6.0|
// | 1|12-2-2019| 12|1549958400| 9.5| 7.0| 6.5|
// | 1|13-2-2019| 13|1550044800| 10.5| 8.0| 7.0|
// | 1|14-2-2019| 14|1550131200| 11.5| 9.0| 7.5|
// | 1|15-2-2019| 15|1550217600| 12.5| 10.0| 8.0|
// | 3| 1-2-2019| 1|1549008000| 1.0| 1.0| 1.0|
// | 3| 2-2-2019| 2|1549094400| 1.5| 1.5| 1.5|
// | 3| 3-2-2019| 3|1549180800| 2.0| 2.0| 2.0|
// | 3| 4-2-2019| 4|1549267200| 2.5| 2.5| 2.5|
// | 3| 5-2-2019| 5|1549353600| 3.0| 3.0| 3.0|
// +----------+---------+----+----------+------+-------+-------+
// only showing top 20 rows
请注意,如果日期保证是连续的每日时间序列,则可以直接在rowsBetween
上使用rangeBetween
(而不是calc_date
)。