ulrich
ulrich

Reputation: 3587

Counting by range

The following script can be used to "count by" keys

val nbr = List(1,2,2,3,3,3,4,4,4,4)
val nbrPairsRDD = sc.parallelize(nbr).map(nbr => (nbr, 1))

val nbrCountsWithReduce = nbrPairsRDD
  .reduceByKey(_ + _)
  .collect()

nbrCountsWithReduce.foreach(println)

it returns:

(1,1)
(2,2)
(3,3)
(4,4)

How could it be modified to map by range rather than absolute values and give the following output if we had two ranges 1:2 and 3:4:

(1:2,3)
(3:4,7)

Upvotes: 0

Views: 83

Answers (3)

ulrich
ulrich

Reputation: 3587

One option is to convert the list into double and use the histogram function:

val nbr = List(1,2,2,3,3,3,4,4,4,4)
val nbrPairsRDD = sc.parallelize(nbr).map(_.toDouble).histogram(2)

Upvotes: 1

Durga Viswanath Gadiraju
Durga Viswanath Gadiraju

Reputation: 3966

Here is the code snippet to compute aggregations by range:

val nbr = List(1,2,2,3,3,3,4,4,4,4)
val nbrs = sc.parallelize(nbr)

var lb = 1
var incr = 1
var ub = lb + incr
val nbrsMap = nbrs.map(rec => { 
   if(rec > ub) {
     lb = rec
     ub = lb + incr
   }
   (lb.toString + ":" + ub.toString, 1)
 })

nbrsMap.reduceByKey((acc, value) => acc + value).foreach(println)

(1:2,3)
(3:4,7)

Upvotes: 0

Kakarot
Kakarot

Reputation: 4252

One easy way that I can think of is to map the keys to individual ranges, for eg :

val nbrRangePairs = sc.parallelize(nbr)
                      .map(nbr => (computeRange(nbr), 1))
                      .reduceByKey(_ + _)
                      .collect()

// function to compute Ranges
def computeRange(num : int) : String = 
{
    if(num < 3)
       return "1:2"
    else if(num < 5)
       return "2:3"
    else
       return "invalid"

}

Upvotes: 0

Related Questions