Reputation: 19328
I'd like a function that a column and a list of bucket ranges as arguments and returns the appropriate bucket. I want to solve this with the Spark API and don't want to use a UDF.
Let's say we start with this DataFrame (df
):
+--------+
|some_num|
+--------+
| 3|
| 24|
| 45|
| null|
+--------+
Here's the desired behavior of the function:
df.withColumn(
"bucket",
bucketFinder(
col("some_num"),
Array(
(0, 10),
(10, 20),
(20, 30),
(30, 70)
)
)
).show()
+--------+------+
|some_num|bucket|
+--------+------+
| 3| 0-10|
| 24| 20-30|
| 45| 30-70|
| null| null|
+--------+------+
Here is the code I tried that does not work:
def bucketFinder(col: Column, buckets: Array[(Any, Any)]): Column = {
buckets.foreach { res: (Any, Any) =>
when(col.between(res._1, res._2), lit(s"$res._1 - $res._2"))
}
}
It's pretty easy to write this code with a UDF, but hard when constrained to only the Spark API.
Upvotes: 1
Views: 1580
Reputation: 19328
Here is a pure Spark solution:
def bucketFinder(col: Column, buckets: Array[(Any, Any)]): Column = {
val b = buckets.map { res: (Any, Any) =>
when(col.between(res._1, res._2), lit(s"${res._1}-${res._2}"))
}
coalesce(b: _*)
}
I'll leave this question open for a bit to see if someone else has a more elegant solution.
Upvotes: 2
Reputation: 215057
You can divide the column by 10 and then the floor
and ceil
of the column should make the bucket you need:
val bucket_size = 10
val floor_col = floor(df("some_num") / bucket_size) * bucket_size
df.withColumn("bucket", concat_ws("-", floor_col, floor_col + bucket_size)).show
+--------+------+
|some_num|bucket|
+--------+------+
| 3| 0-10|
| 24| 20-30|
For a bucket size of 5:
val bucket_size1 = 5
val floor_col = floor(df("some_num") / bucket_size1) * bucket_size1
df.withColumn("bucket", concat_ws("-", floor_col, floor_col + bucket_size1)).show
+--------+------+
|some_num|bucket|
+--------+------+
| 3| 0-5|
| 24| 20-25|
Upvotes: 2