date

问题描述 投票:0回答:1

我有一个数据框架,结构如下。

+----------+------+------+----------------+--------+------+
|      date|market|metric|aggregator_Value|type    |rank  |
+----------+------+------+----------------+--------+------+
|2018-08-05|    m1|   16 |              m1|median  |  1   |
|2018-08-03|    m1|    5 |              m1|median  |  2   |
|2018-08-01|    m1|   10 |              m1|mean    |  3   |
|2018-08-05|    m2|   35 |              m2|mean    |  1   |
|2018-08-03|    m2|   25 |              m2|mean    |  2   |
|2018-08-01|    m2|    5 |              m2|mean    |  3   |
+----------+------+------+----------------+--------+------+

在这个数据框中,等级列是根据日期的顺序和市场列的分组来计算的。

val w_rank = Window.partitionBy("market").orderBy(desc("date"))
val outputDF2=outputDF1.withColumn("rank",rank().over(w_rank))

我想提取在输出数据框中当rank=1时的度量值,条件是如果rank=1行中的type="中位数",那么就将该市场的所有度量值连接起来,否则如果rank=1行中的type="平均值",那么就只连接前两个度量值。

+----------+------+------+----------------+--------+---------+
|      date|market|metric|aggregator_Value|type    |result   |
+----------+------+------+----------------+--------+---------+
|2018-08-05|    m1|   16 |              m1|median  |10|5|16  |
|2018-08-05|    m2|   35 |              m1|mean    |25|35    |
+----------+------+------+----------------+--------+---------+    

我怎样才能实现这个目标 ?

scala apache-spark apache-spark-2.0
1个回答
© www.soinside.com 2019 - 2024. All rights reserved.