Christian Zichichi
Christian Zichichi

Reputation: 5

Average between elements of RDD of rows

I have an RDD of many Rows (namely, RDDmacReturns) that follows this structure:

case class macReturns (macAddress: String, 
                       hourReturns: Long, 
                       threeHoursReturns: Long,
                       sixHoursReturns: Long, 
                       halfDailyReturns: Long, 
                       dailyReturns: Long,
                       threeDailyReturns: Long, 
                       weeklyReturns: Long, 
                       biWeeklyReturns: Long, 
                       threeWeeklyReturns: Long, 
                       monthlyReturns: Long)

so, for example, a row of that RDD would be like:

macReturns(a2:b2:c3:d3:f4:c5,3,4,1,0,3,4,3,5,1,7)

macAddresses have already been grouped so they are all distinct.

Now, I have to create a new RDD with a single row, doing transformations/actions on RDDmacReturns, that follows the same aforementioned structure (case class MacReturns) and contains a fixed chosen (fake) macAddress and the average of each field calculated between the elements of RDDmacReturns, like this:

macReturns(00:00:00:00:00:00,
           averageHourReturns,
           averageThreeHoursReturns,
           averageSixHoursReturns,
           averageHalfDailyReturns,
           averageDailyReturns,
           averageThreeDailyReturns,
           averageWeeklyReturns,
           averageBiWeeklyReturns,
           averageThreeWeeklyReturns,
           averageMonthlyReturns)

To sum up, I need a function that applied to RDDmacReturns, returns RDDaverageReturns, that contains a single row (described just above)

Thanks for the help

Upvotes: 0

Views: 348

Answers (1)

mtoto
mtoto

Reputation: 24178

You could use colStats() which returns an instance of MultivariateStatisticalSummary, containing among other things the column-wise mean. Here's a reproducible example similar to your problem:

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}

val rdd = sc.parallelize(Seq(
  ("id1",1,2,3,4),
  ("id2",3,5,1,5),
  ("id3",3,0,9,8),
  ("id4",4,4,1,2)))
// First we convert to RDD of dense vectors 
val rdd_dense = rdd.map(x => Vectors.dense(x._2, x._3, x._4, x._5))
// Attain colStats and grab the mean
val summary: MultivariateStatisticalSummary = Statistics.colStats(rdd_dense)
println(summary.mean) 
[2.75,2.75,3.5000000000000004,4.75]

Upvotes: 1

Related Questions