Reputation: 5
I have an RDD of many Rows (namely, RDDmacReturns
) that follows this structure:
case class macReturns (macAddress: String,
hourReturns: Long,
threeHoursReturns: Long,
sixHoursReturns: Long,
halfDailyReturns: Long,
dailyReturns: Long,
threeDailyReturns: Long,
weeklyReturns: Long,
biWeeklyReturns: Long,
threeWeeklyReturns: Long,
monthlyReturns: Long)
so, for example, a row of that RDD would be like:
macReturns(a2:b2:c3:d3:f4:c5,3,4,1,0,3,4,3,5,1,7)
macAddresses have already been grouped so they are all distinct.
Now, I have to create a new RDD with a single row, doing transformations/actions on RDDmacReturns
, that follows the same aforementioned structure (case class MacReturns) and contains a fixed chosen (fake) macAddress and the average of each field calculated between the elements of RDDmacReturns, like this:
macReturns(00:00:00:00:00:00,
averageHourReturns,
averageThreeHoursReturns,
averageSixHoursReturns,
averageHalfDailyReturns,
averageDailyReturns,
averageThreeDailyReturns,
averageWeeklyReturns,
averageBiWeeklyReturns,
averageThreeWeeklyReturns,
averageMonthlyReturns)
To sum up, I need a function that applied to RDDmacReturns, returns RDDaverageReturns, that contains a single row (described just above)
Thanks for the help
Upvotes: 0
Views: 348
Reputation: 24178
You could use colStats()
which returns an instance of MultivariateStatisticalSummary
, containing among other things the column-wise mean
. Here's a reproducible example similar to your problem:
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
val rdd = sc.parallelize(Seq(
("id1",1,2,3,4),
("id2",3,5,1,5),
("id3",3,0,9,8),
("id4",4,4,1,2)))
// First we convert to RDD of dense vectors
val rdd_dense = rdd.map(x => Vectors.dense(x._2, x._3, x._4, x._5))
// Attain colStats and grab the mean
val summary: MultivariateStatisticalSummary = Statistics.colStats(rdd_dense)
println(summary.mean)
[2.75,2.75,3.5000000000000004,4.75]
Upvotes: 1