Reputation: 3587
The following script can be used to "count by" keys
val nbr = List(1,2,2,3,3,3,4,4,4,4)
val nbrPairsRDD = sc.parallelize(nbr).map(nbr => (nbr, 1))
val nbrCountsWithReduce = nbrPairsRDD
.reduceByKey(_ + _)
.collect()
nbrCountsWithReduce.foreach(println)
it returns:
(1,1)
(2,2)
(3,3)
(4,4)
How could it be modified to map by range rather than absolute values and give the following output if we had two ranges 1:2 and 3:4:
(1:2,3)
(3:4,7)
Upvotes: 0
Views: 83
Reputation: 3587
One option is to convert the list into double and use the histogram function:
val nbr = List(1,2,2,3,3,3,4,4,4,4)
val nbrPairsRDD = sc.parallelize(nbr).map(_.toDouble).histogram(2)
Upvotes: 1
Reputation: 3966
Here is the code snippet to compute aggregations by range:
val nbr = List(1,2,2,3,3,3,4,4,4,4)
val nbrs = sc.parallelize(nbr)
var lb = 1
var incr = 1
var ub = lb + incr
val nbrsMap = nbrs.map(rec => {
if(rec > ub) {
lb = rec
ub = lb + incr
}
(lb.toString + ":" + ub.toString, 1)
})
nbrsMap.reduceByKey((acc, value) => acc + value).foreach(println)
(1:2,3)
(3:4,7)
Upvotes: 0
Reputation: 4252
One easy way that I can think of is to map the keys to individual ranges, for eg :
val nbrRangePairs = sc.parallelize(nbr)
.map(nbr => (computeRange(nbr), 1))
.reduceByKey(_ + _)
.collect()
// function to compute Ranges
def computeRange(num : int) : String =
{
if(num < 3)
return "1:2"
else if(num < 5)
return "2:3"
else
return "invalid"
}
Upvotes: 0