Reputation: 7217
I am trying to use a Spark accumulator to remove a group by query which has poor performance.
import org.apache.spark._
object CountPairsParam extends AccumulatorParam[Map[Int, Set[Int]]] {
def zero(initialValue: Map[Int, Set[Int]]): Map[Int, Set[Int]] = {
Map.empty[Int, Set[Int]]
}
def addInPlace(m1: Map[Int, Set[Int]], m2: Map[Int, Set[Int]]): Map[Int, Set[Int]] = {
val keys = m1.keys ++ m2.keys
keys.map((k: Int) => (k -> (m1.getOrElse(k, Set.empty[Int]) ++ m2.getOrElse(k, Set.empty[Int])))).toMap
}
}
val accum = sc.accumulator(Map.empty[Int, Set[Int]])(CountPairsParam)
srch_destination_id_distinct.foreach(r => try{accum += Map(r(0).toString.toInt -> Set(r(1).toString.toInt))} catch {case ioe: NumberFormatException => Map.empty[Int, Set[Int]]})
In my accumulator I am assuming that m2 isn't going to always be a single item set created in my foreach loop, and that sometimes Spark will be using this method to add two different maps that have more then one key. But because of this my performance is low. Does the right Map always come into the accumulator with one item, set from my for each loop, or do I need to make this performance trade off?
Upvotes: 0
Views: 1174
Reputation: 1918
You should generally avoid using Accumulators
for anything but debugging because there's no guarantee, as far as I know, that each entry of the RDD
will only be "added" into the Accumulator
exactly once.
Maybe try something like this:
import scala.collection.mutable.HashSet
import scala.util.Try
val result = srch_destination_id_distinct.flatMap(r =>
Try((r(0).toString.toInt, r(1).toString.toInt)).toOption
).aggregateByKey(HashSet.empty[Int])(
(set, n) => set += n,
(set1, set2) => set1 union set2
).mapValues(_.toSet).collectAsMap
The distinction between seqOp
and combOp
arguments of the aggregate
method also allow us to avoid "wrapping" each element of the RDD
in a Map[Int, Set[Int]]
in the way you did with your approach.
Upvotes: 3