Georg Heiler
Georg Heiler

Reputation: 17704

How to aggregate data into ranges (bucketize)?

I have a table like

+---------------+------+
|id             | value|
+---------------+------+
|               1|118.0|
|               2|109.0|
|               3|113.0|
|               4| 82.0|
|               5| 60.0|
|               6|111.0|
|               7|107.0|
|               8| 84.0|
|               9| 91.0|
|              10|118.0|
+---------------+------+

ans would like aggregate or bin the values to a range 0,10,20,30,40,...80,90,100,110,120how can I perform this in SQL or more specific spark sql?

Currently I have a lateral view join with the range but this seems rather clumsy / inefficient.

The quantile discretized is not really what I want, rather a CUT with this range.

edit

https://github.com/collectivemedia/spark-ext/blob/master/sparkext-mllib/src/main/scala/org/apache/spark/ml/feature/Binning.scala would perform dynamic bins, but I would rather need this specified range.

Upvotes: 3

Views: 8019

Answers (2)

Bertram Gilfoyle
Bertram Gilfoyle

Reputation: 10235

Try "GROUP BY" with this

SELECT id, (value DIV 10)*10 FROM table_name ;

The following would be using the Dataset API for Scala:

df.select(('value divide 10).cast("int")*10)

Upvotes: 3

Hristo Iliev
Hristo Iliev

Reputation: 74455

In the general case, static binning can be performed using org.apache.spark.ml.feature.Bucketizer:

val df = Seq(
  (1, 118.0), (2, 109.0), (3, 113.0), (4, 82.0), (5, 60.0),
  (6, 111.0), (7, 107.0), (8,  84.0), (9, 91.0), (10, 118.0)
).toDF("id", "value")

val splits = (0 to 12).map(_ * 10.0).toArray

import org.apache.spark.ml.feature.Bucketizer
val bucketizer = new Bucketizer()
  .setInputCol("value")
  .setOutputCol("bucket")
  .setSplits(splits)

val bucketed = bucketizer.transform(df)

val solution = bucketed.groupBy($"bucket").agg(count($"id") as "count")

Result:

scala> solution.show
+------+-----+
|bucket|count|
+------+-----+
|   8.0|    2|
|  11.0|    4|
|  10.0|    2|
|   6.0|    1|
|   9.0|    1|
+------+-----+

The bucketizer throws errors when values lie outside the defined bins. It is possible to define split points as Double.NegativeInfinity or Double.PositiveInfinity to capture outliers.

Bucketizer is designed to work efficiently with arbitrary splits by performing binary search of the right bucket. In the case of regular bins like yours, one can simply do something like:

val binned = df.withColumn("bucket", (($"value" - bin_min) / bin_width) cast "int")

where bin_min and bin_width are the left interval of the minimum bin and the bin width, respectively.

Upvotes: 15

Related Questions