Reputation: 17704
I have a table like
+---------------+------+
|id | value|
+---------------+------+
| 1|118.0|
| 2|109.0|
| 3|113.0|
| 4| 82.0|
| 5| 60.0|
| 6|111.0|
| 7|107.0|
| 8| 84.0|
| 9| 91.0|
| 10|118.0|
+---------------+------+
ans would like aggregate or bin the values to a range 0,10,20,30,40,...80,90,100,110,120
how can I perform this in SQL or more specific spark sql?
Currently I have a lateral view join with the range but this seems rather clumsy / inefficient.
The quantile discretized is not really what I want, rather a CUT
with this range.
https://github.com/collectivemedia/spark-ext/blob/master/sparkext-mllib/src/main/scala/org/apache/spark/ml/feature/Binning.scala would perform dynamic bins, but I would rather need this specified range.
Upvotes: 3
Views: 8019
Reputation: 10235
Try "GROUP BY" with this
SELECT id, (value DIV 10)*10 FROM table_name ;
The following would be using the Dataset API for Scala:
df.select(('value divide 10).cast("int")*10)
Upvotes: 3
Reputation: 74455
In the general case, static binning can be performed using org.apache.spark.ml.feature.Bucketizer:
val df = Seq(
(1, 118.0), (2, 109.0), (3, 113.0), (4, 82.0), (5, 60.0),
(6, 111.0), (7, 107.0), (8, 84.0), (9, 91.0), (10, 118.0)
).toDF("id", "value")
val splits = (0 to 12).map(_ * 10.0).toArray
import org.apache.spark.ml.feature.Bucketizer
val bucketizer = new Bucketizer()
.setInputCol("value")
.setOutputCol("bucket")
.setSplits(splits)
val bucketed = bucketizer.transform(df)
val solution = bucketed.groupBy($"bucket").agg(count($"id") as "count")
Result:
scala> solution.show
+------+-----+
|bucket|count|
+------+-----+
| 8.0| 2|
| 11.0| 4|
| 10.0| 2|
| 6.0| 1|
| 9.0| 1|
+------+-----+
The bucketizer throws errors when values lie outside the defined bins. It is possible to define split points as Double.NegativeInfinity
or Double.PositiveInfinity
to capture outliers.
Bucketizer
is designed to work efficiently with arbitrary splits by performing binary search of the right bucket. In the case of regular bins like yours, one can simply do something like:
val binned = df.withColumn("bucket", (($"value" - bin_min) / bin_width) cast "int")
where bin_min
and bin_width
are the left interval of the minimum bin and the bin width, respectively.
Upvotes: 15