我有一个数据框架,结构如下。
+----------+------+------+----------------+--------+------+
| date|market|metric|aggregator_Value|type |rank |
+----------+------+------+----------------+--------+------+
|2018-08-05| m1| 16 | m1|median | 1 |
|2018-08-03| m1| 5 | m1|median | 2 |
|2018-08-01| m1| 10 | m1|mean | 3 |
|2018-08-05| m2| 35 | m2|mean | 1 |
|2018-08-03| m2| 25 | m2|mean | 2 |
|2018-08-01| m2| 5 | m2|mean | 3 |
+----------+------+------+----------------+--------+------+
在这个数据框中,等级列是根据日期的顺序和市场列的分组来计算的。
val w_rank = Window.partitionBy("market").orderBy(desc("date"))
val outputDF2=outputDF1.withColumn("rank",rank().over(w_rank))
我想提取在输出数据框中当rank=1时的度量值,条件是如果rank=1行中的type="中位数",那么就将该市场的所有度量值连接起来,否则如果rank=1行中的type="平均值",那么就只连接前两个度量值。
+----------+------+------+----------------+--------+---------+
| date|market|metric|aggregator_Value|type |result |
+----------+------+------+----------------+--------+---------+
|2018-08-05| m1| 16 | m1|median |10|5|16 |
|2018-08-05| m2| 35 | m1|mean |25|35 |
+----------+------+------+----------------+--------+---------+
我怎样才能实现这个目标 ?