Reputation: 2489
I have a RDD of Breeze Vectors and want to calculate their average. My first approach is to use aggregate
:
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.rdd.RDD
import org.scalatest.{ BeforeAndAfterAll, FunSuite, Matchers, Suite }
import org.scalatest.prop.GeneratorDrivenPropertyChecks
import breeze.linalg.{ Vector => BreezeVector }
class CalculateMean extends FunSuite with Matchers with GeneratorDrivenPropertyChecks with SparkSpec {
test("Calculate mean") {
type U = (BreezeVector[Double], Int)
type T = BreezeVector[Double]
val rdd: RDD[T] = sc.parallelize(List(1.0, 2, 3, 4, 5, 6).map { x => BreezeVector(x, x * x) }, 2)
val zeroValue = (BreezeVector.zeros[Double](2), 0)
val seqOp = (agg: U, x: T) => (agg._1 + x, agg._2 + 1)
val combOp = (xs: U, ys: U) => (xs._1 + ys._1, xs._2 + ys._2)
val mean = rdd.aggregate(zeroValue)(seqOp, combOp)
println(mean._1 / mean._2.toDouble)
}
}
/**
* Setup and tear down spark context
*/
trait SparkSpec extends BeforeAndAfterAll {
this: Suite =>
private val master = "local[2]"
private val appName = this.getClass.getSimpleName
private var _sc: SparkContext = _
def sc: org.apache.spark.SparkContext = _sc
val conf: SparkConf = new SparkConf()
.setMaster(master)
.setAppName(appName)
override def beforeAll(): Unit = {
super.beforeAll()
_sc = new SparkContext(conf)
}
override def afterAll(): Unit = {
if (_sc != null) {
_sc.stop()
_sc = null
}
super.afterAll()
}
}
However this algorithm may be numerical unstable (see https://stackoverflow.com/a/1346890/1037094).
How can I implement Knuths algorithm for Breeze Vectors in Spark and is rdd.aggregate
the recommended way to do it?
Upvotes: 3
Views: 1580
Reputation: 330093
How can I implement Knuths algorithm for Breeze Vectors in Spark and is rdd.aggregate the recommended way to do it?
aggregate
could be a good way to do it, if algorithm described by Knuth was the right choice. Unfortunately it isn't, or at least not in without some tweaking. It is and inherently sequential streaming algorithm and function it applies is not associative. Lets assume that you have a function knuth_mean
. It should be clear that (ignoring counting and single element cases):
(knuth_mean (knuth_mean (knuth_mean 1 2) 3) 4)
it not the same as
(knuth_mean (knuth_mean 1 2) (knuth_mean 3 4))
Still, you can use Knuth algorithm to get an average per partition:
def partMean(n: Int)(iter: Iterator[BreezeVector[Double]]) = {
val partialMean = iter.foldLeft((BreezeVector.zeros[Double](n), 0.0))(
(acc: (BreezeVector[Double], Double), v: BreezeVector[Double]) =>
(acc._1 + (v - acc._1) / (acc._2 + 1.0), acc._2 + 1.0))
Iterator(partialMean)
}
val means = rdd.mapPartitions(partMean(lengthOfVector))
Problem remains how to aggregate this partial results. Direct application of the Knuth algorithm would require unfolding partition and it pretty much beats a whole purpose of using Spark. You can StatCounter.merge
method to see how it is handled internally in Spark.
Upvotes: 1