Manuel Schmidt
Manuel Schmidt

Reputation: 2489

Calculate average over RDD[Vector] in Spark

I have a RDD of Breeze Vectors and want to calculate their average. My first approach is to use aggregate:

import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.rdd.RDD
import org.scalatest.{ BeforeAndAfterAll, FunSuite, Matchers, Suite }
import org.scalatest.prop.GeneratorDrivenPropertyChecks

import breeze.linalg.{ Vector => BreezeVector }

class CalculateMean extends FunSuite with Matchers with GeneratorDrivenPropertyChecks with SparkSpec {

  test("Calculate mean") {

    type U = (BreezeVector[Double], Int)
    type T = BreezeVector[Double]
    val rdd: RDD[T] = sc.parallelize(List(1.0, 2, 3, 4, 5, 6).map { x => BreezeVector(x, x * x) }, 2)

    val zeroValue = (BreezeVector.zeros[Double](2), 0)
    val seqOp = (agg: U, x: T) => (agg._1 + x, agg._2 + 1)
    val combOp = (xs: U, ys: U) => (xs._1 + ys._1, xs._2 + ys._2)

    val mean = rdd.aggregate(zeroValue)(seqOp, combOp)
    println(mean._1 / mean._2.toDouble)

  }

}

/**
 * Setup and tear down spark context
 */
trait SparkSpec extends BeforeAndAfterAll {
  this: Suite =>

  private val master = "local[2]"
  private val appName = this.getClass.getSimpleName

  private var _sc: SparkContext = _

  def sc: org.apache.spark.SparkContext = _sc

  val conf: SparkConf = new SparkConf()
    .setMaster(master)
    .setAppName(appName)

  override def beforeAll(): Unit = {
    super.beforeAll()
    _sc = new SparkContext(conf)
  }

  override def afterAll(): Unit = {
    if (_sc != null) {
      _sc.stop()
      _sc = null
    }

    super.afterAll()
  }
}

However this algorithm may be numerical unstable (see https://stackoverflow.com/a/1346890/1037094).

How can I implement Knuths algorithm for Breeze Vectors in Spark and is rdd.aggregate the recommended way to do it?

Upvotes: 3

Views: 1580

Answers (1)

zero323
zero323

Reputation: 330093

How can I implement Knuths algorithm for Breeze Vectors in Spark and is rdd.aggregate the recommended way to do it?

aggregate could be a good way to do it, if algorithm described by Knuth was the right choice. Unfortunately it isn't, or at least not in without some tweaking. It is and inherently sequential streaming algorithm and function it applies is not associative. Lets assume that you have a function knuth_mean. It should be clear that (ignoring counting and single element cases):

(knuth_mean (knuth_mean (knuth_mean 1 2) 3) 4)

it not the same as

(knuth_mean (knuth_mean 1 2) (knuth_mean 3 4))

Still, you can use Knuth algorithm to get an average per partition:

def partMean(n: Int)(iter: Iterator[BreezeVector[Double]]) = {
  val partialMean = iter.foldLeft((BreezeVector.zeros[Double](n), 0.0))(
    (acc: (BreezeVector[Double], Double), v: BreezeVector[Double]) => 
      (acc._1 + (v - acc._1) / (acc._2 + 1.0), acc._2 + 1.0))
    Iterator(partialMean)
}

val means = rdd.mapPartitions(partMean(lengthOfVector))

Problem remains how to aggregate this partial results. Direct application of the Knuth algorithm would require unfolding partition and it pretty much beats a whole purpose of using Spark. You can StatCounter.merge method to see how it is handled internally in Spark.

Upvotes: 1

Related Questions