abc_spark
abc_spark

Reputation: 383

Histogram Issue in spark Scala

+----+----+--------+
| Id | M1 |  trx   |
+----+----+--------+
| 1  | M1 | 11.35  |
| 2  | M1 | 3.4    |
| 3  | M1 | 10.45  |
| 2  | M1 | 3.95   |
| 3  | M1 | 20.95  |
| 2  | M1 | 25.55  |
| 1  | M1 |  9.95  |
| 2  | M1 | 11.95  |
| 1  | M1 |  9.65  |
| 1  | M1 | 14.54  |
+----+----+--------+

With the above dataframe I should be able to generate a histogram as below using the below code.

val (Range,counts) = df
    .select(col("trx"))
    .rdd.map(r => r.getDouble(0))
    .histogram(10)
// Range: Array[Double] = Array(3.4, 5.615, 7.83, 10.045, 12.26, 14.475, 16.69, 18.905, 21.12, 23.335, 25.55)
// counts: Array[Long] = Array(2, 0, 2, 3, 0, 1, 0, 1, 0, 1) 

counts contains the number of elements in each range.

But how to get the sum of the elements, sum(trx), in each range like:

sumOfTrx: Array[Long] = Array(7.3,0,19.6,xx,xx,xx,xx,xx,xx,25.55).

Upvotes: 0

Views: 1836

Answers (1)

Oli
Oli

Reputation: 10406

So you built a histogram over the trx column and you want to get the sum of the values within each range.

What we can do is define a UDF that returns the index of the range given the value of the trx column. Then, we can use groupBy to compute what you want. The, a few manipulations will allow us to get the Array you want.

// getting the histogram
val (ranges, counts) = df
    .select("trx").rdd.map(_.getDouble(0))
    .histogram(10)

// the UDF I was referring to
val rangeIndex = udf((x : Double) => ranges.lastIndexWhere(x >= _))

// Summing the elements and building a map that associates indices to sums
val sumMap = df
    .withColumn("rangeIndex", rangeIndex('trx))
    .groupBy("rangeIndex")
    .agg(sum('trx))
    .rdd.map(x=> x.getAs[Int]("rangeIndex") -> x.getAs[Double]("sum(trx)") )
    .collectAsMap

// Building the array
ranges.indices.map(i => sumMap.getOrElse(i, 0d)).toArray
// res: Array[Double] = Array(7.35, 0.0, 19.6, 33.75, 0.0, 14.54, 0.0, 20.95, 0.0, 0.0, 25.55)

Upvotes: 1

Related Questions