如何汇总数据,为范围(bucketize)?

问题描述 投票:3回答:2

我有这样一个表

+---------------+------+
|id             | value|
+---------------+------+
|               1|118.0|
|               2|109.0|
|               3|113.0|
|               4| 82.0|
|               5| 60.0|
|               6|111.0|
|               7|107.0|
|               8| 84.0|
|               9| 91.0|
|              10|118.0|
+---------------+------+

ANS想骨料或斌值的范围我0,10,20,30,40,...80,90,100,110,120how可以在SQL或多个特定的火花SQL执行此?

目前,我有一个横向视图与范围加入但这似乎相当笨拙/低效。

离散的位数是不是我真正想要的东西,而是一个与此范围CUT

edit

https://github.com/collectivemedia/spark-ext/blob/master/sparkext-mllib/src/main/scala/org/apache/spark/ml/feature/Binning.scala将执行动态垃圾桶,但我宁愿需要这个指定的范围。

sql apache-spark apache-spark-sql
2个回答
3
投票

尝试“GROUP BY”与此

SELECT id, (value DIV 10)*10 FROM table_name ;

将使用DataSet API斯卡拉以下内容:

df.select(('value divide 10).cast("int")*10)

10
投票

在一般情况下,可使用org.apache.spark.ml.feature.Bucketizer来执行静态合并:

val df = Seq(
  (1, 118.0), (2, 109.0), (3, 113.0), (4, 82.0), (5, 60.0),
  (6, 111.0), (7, 107.0), (8,  84.0), (9, 91.0), (10, 118.0)
).toDF("id", "value")

val splits = (0 to 12).map(_ * 10.0).toArray

import org.apache.spark.ml.feature.Bucketizer
val bucketizer = new Bucketizer()
  .setInputCol("value")
  .setOutputCol("bucket")
  .setSplits(splits)

val bucketed = bucketizer.transform(df)

val solution = bucketed.groupBy($"bucket").agg(count($"id") as "count")

结果:

scala> solution.show
+------+-----+
|bucket|count|
+------+-----+
|   8.0|    2|
|  11.0|    4|
|  10.0|    2|
|   6.0|    1|
|   9.0|    1|
+------+-----+

当值位于所定义的二进制位外的bucketizer引发错误。它可以定义分割点为Double.NegativeInfinityDouble.PositiveInfinity捕捉到异常。

Bucketizer被设计为与任意拆分通过执行二进制搜索权斗的提高工作效率。在普通垃圾箱像你这样的情况下,可以简单地这样做:

val binned = df.withColumn("bucket", (($"value" - bin_min) / bin_width) cast "int")

其中bin_minbin_width分别是最小bin和所述箱宽度的左间隔。

© www.soinside.com 2019 - 2024. All rights reserved.