Daniel Imberman
Daniel Imberman

Reputation: 638

aggregating multiple values at once

So I'm running into a speed issue where I have a dataset that needs to be aggregated multiple times.

Initially my team had set up three accumulators and were running a single foreach loop over the data. Something along the lines of

val accum1:Accumulable[a]
val accum2: Accumulable[b]
val accum3: Accumulable[c]

data.foreach{
        u => 
                accum1+=u
                accum2 += u
                accum3 += u 
}

I am trying to switch these accumulations into an aggregation so that I can get a speed boost and have access to accumulators for debugging. I am currently trying to figure out a way to aggregate these three types at once, since running 3 separate aggregations is significantly slower. Does anyone have any thoughts as to how I can do this? Perhaps aggregating agnostically then pattern matching to split into two RDDs?

Thank you

Upvotes: 0

Views: 135

Answers (1)

zero323
zero323

Reputation: 330453

As far as I can tell all you need here is aggregate with zeroValue, seqOp and combOp corresponding to the operations which are performed by your accumulators.

val zeroValue: (A, B, C) = ??? // (accum1.zero, accum2.zero, accum3.zero)

def seqOp(r: (A, B, C), t: T): (A, B, C) = r match {
  case (a, b, c) =>  {
     // Apply operations equivalent to
     // accum1.addAccumulator(a, t)
     // accum2.addAccumulator(c, t))
     // accum3.addAccumulator(c, t)
     // and return the first argument
     // r
  }
}

def combOp(r1: (A, B, C), r2: (A, B, C)): (A, B, C) = (r1, r2) match {

  case ((a1, b1, c1), (a2, b2, c2)) => {
     // Apply operations equivalent to
     // acc1.addInPlace(a1, a2)
     // acc2.addInPlace(b1, b2)
     // acc3.addInPlace(c1, c2)
     // and return the first argument
     // r1
  }
}

val rdd: RDD[T] = ???

val accums: (A, B, C) = rdd.aggregate(zeroValue)(seqOp, combOp)

Upvotes: 1

Related Questions