以动态方式找到Spark-Scala中的百分位数

问题描述 投票:1回答:1

我正在尝试使用如下所示的Window函数在列上执行百分位数。我已引用here在组上使用ApproxQuantile定义。

val df1=Seq((1,10.0),(1,20.0),(1,40.6),(1,15.6),(1,17.6),(1,25.6),(1,39.6),(2,20.5),(2,70.3),(2,69.4),(2,74.4),(2,45.4),(3,60.6),(3,80.6),(4,30.6),(4,90.6))toDF("ID","Count")

val idBucketMapping=Seq((1,4),(2,3),(3,2),(4,2))toDF("ID","Bucket")

//jpp
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
import org.apache.spark.sql.expressions.Window

 object PercentileApprox {
  def percentile_approx(col: Column, percentage: Column, accuracy: Column): Column = {
    val expr = new ApproximatePercentile( col.expr,  percentage.expr, accuracy.expr ).toAggregateExpression
    new Column(expr)
  }
  def percentile_approx(col: Column, percentage: Column): Column = percentile_approx(col, percentage, lit(ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY))
}
import PercentileApprox._

var res = df1.withColumn("percentile",percentile_approx(col("count"), typedLit(doBucketing(2) )).over(Window.partitionBy("ID")))

def doBucketing(bucket_size : Int) = (1 until bucket_size).scanLeft(0d)((a, _) => a + (1 / bucket_size.toDouble))

scala> df1.show
+---+-----+
| ID|Count|
+---+-----+
|  1| 10.0|
|  1| 20.0|
|  1| 40.6|
|  1| 15.6|
|  1| 17.6|
|  1| 25.6|
|  1| 39.6|
|  2| 20.5|
|  2| 70.3|
|  2| 69.4|
|  2| 74.4|
|  2| 45.4|
|  3| 60.6|
|  3| 80.6|
|  4| 30.6|
|  4| 90.6|
+---+-----+


scala> idBucketMapping.show
+---+------+
| ID|Bucket|
+---+------+
|  1|     4|
|  2|     3|
|  3|     2|
|  4|     2|
+---+------+


scala> res.show
+---+-----+------------------+
| ID|Count|        percentile|
+---+-----+------------------+
|  1| 10.0|[10.0, 20.0, 40.6]|
|  1| 20.0|[10.0, 20.0, 40.6]|
|  1| 40.6|[10.0, 20.0, 40.6]|
|  1| 15.6|[10.0, 20.0, 40.6]|
|  1| 17.6|[10.0, 20.0, 40.6]|
|  1| 25.6|[10.0, 20.0, 40.6]|
|  1| 39.6|[10.0, 20.0, 40.6]|
|  3| 60.6|[60.6, 60.6, 80.6]|
|  3| 80.6|[60.6, 60.6, 80.6]|
|  4| 30.6|[30.6, 30.6, 90.6]|
|  4| 90.6|[30.6, 30.6, 90.6]|
|  2| 20.5|[20.5, 69.4, 74.4]|
|  2| 70.3|[20.5, 69.4, 74.4]|
|  2| 69.4|[20.5, 69.4, 74.4]|
|  2| 74.4|[20.5, 69.4, 74.4]|
|  2| 45.4|[20.5, 69.4, 74.4]|
+---+-----+------------------+

到目前为止,一切都很好,逻辑很简单。但是我需要以动态的方式取得结果。这意味着此函数的参数doBucketing(2)应该基于ID值从idBucketMapping中获取。

这对我来说似乎有点棘手。这有可能吗?

预期输出-这意味着百分位存储桶基于-idBucketMapping数据帧。

+---+-----+------------------------+
|ID |Count|percentile              |
+---+-----+------------------------+
|1  |10.0 |[10.0, 15.6, 20.0, 39.6]|
|1  |20.0 |[10.0, 15.6, 20.0, 39.6]|
|1  |40.6 |[10.0, 15.6, 20.0, 39.6]|
|1  |15.6 |[10.0, 15.6, 20.0, 39.6]|
|1  |17.6 |[10.0, 15.6, 20.0, 39.6]|
|1  |25.6 |[10.0, 15.6, 20.0, 39.6]|
|1  |39.6 |[10.0, 15.6, 20.0, 39.6]|
|3  |60.6 |[60.6, 60.6]            |
|3  |80.6 |[60.6, 60.6]            |
|4  |30.6 |[30.6, 30.6]            |
|4  |90.6 |[30.6, 30.6]            |
|2  |20.5 |[20.5, 45.4, 70.3]      |
|2  |70.3 |[20.5, 45.4, 70.3]      |
|2  |69.4 |[20.5, 45.4, 70.3]      |
|2  |74.4 |[20.5, 45.4, 70.3]      |
|2  |45.4 |[20.5, 45.4, 70.3]      |
+---+-----+------------------------+
scala apache-spark window-functions median percentile
1个回答
0
投票

percentile_approx取百分比和准确度。看来,它们都必须是常量文字。因此,我们无法在运行时使用动态计算的percentile_approxpercentage计算accuracy

© www.soinside.com 2019 - 2024. All rights reserved.