Reputation: 638
So I'm running into a speed issue where I have a dataset that needs to be aggregated multiple times.
Initially my team had set up three accumulators and were running a single foreach loop over the data. Something along the lines of
val accum1:Accumulable[a]
val accum2: Accumulable[b]
val accum3: Accumulable[c]
data.foreach{
u =>
accum1+=u
accum2 += u
accum3 += u
}
I am trying to switch these accumulations into an aggregation so that I can get a speed boost and have access to accumulators for debugging. I am currently trying to figure out a way to aggregate these three types at once, since running 3 separate aggregations is significantly slower. Does anyone have any thoughts as to how I can do this? Perhaps aggregating agnostically then pattern matching to split into two RDDs?
Thank you
Upvotes: 0
Views: 135
Reputation: 330453
As far as I can tell all you need here is aggregate
with zeroValue
, seqOp
and combOp
corresponding to the operations which are performed by your accumulators.
val zeroValue: (A, B, C) = ??? // (accum1.zero, accum2.zero, accum3.zero)
def seqOp(r: (A, B, C), t: T): (A, B, C) = r match {
case (a, b, c) => {
// Apply operations equivalent to
// accum1.addAccumulator(a, t)
// accum2.addAccumulator(c, t))
// accum3.addAccumulator(c, t)
// and return the first argument
// r
}
}
def combOp(r1: (A, B, C), r2: (A, B, C)): (A, B, C) = (r1, r2) match {
case ((a1, b1, c1), (a2, b2, c2)) => {
// Apply operations equivalent to
// acc1.addInPlace(a1, a2)
// acc2.addInPlace(b1, b2)
// acc3.addInPlace(c1, c2)
// and return the first argument
// r1
}
}
val rdd: RDD[T] = ???
val accums: (A, B, C) = rdd.aggregate(zeroValue)(seqOp, combOp)
Upvotes: 1