Dan Ciborowski - MSFT
Dan Ciborowski - MSFT

Reputation: 7217

Spark Accumulators: Is the right accumulator sometimes many or always one?

I am trying to use a Spark accumulator to remove a group by query which has poor performance.

import org.apache.spark._
object CountPairsParam extends AccumulatorParam[Map[Int, Set[Int]]] {

  def zero(initialValue: Map[Int, Set[Int]]): Map[Int, Set[Int]] = {
    Map.empty[Int, Set[Int]]
  }

  def addInPlace(m1: Map[Int, Set[Int]], m2: Map[Int, Set[Int]]): Map[Int, Set[Int]] = {
    val keys = m1.keys ++ m2.keys
     keys.map((k: Int) => (k -> (m1.getOrElse(k, Set.empty[Int]) ++ m2.getOrElse(k, Set.empty[Int])))).toMap
  }
}
val accum = sc.accumulator(Map.empty[Int, Set[Int]])(CountPairsParam)
srch_destination_id_distinct.foreach(r => try{accum += Map(r(0).toString.toInt -> Set(r(1).toString.toInt))} catch {case ioe: NumberFormatException =>  Map.empty[Int, Set[Int]]})

In my accumulator I am assuming that m2 isn't going to always be a single item set created in my foreach loop, and that sometimes Spark will be using this method to add two different maps that have more then one key. But because of this my performance is low. Does the right Map always come into the accumulator with one item, set from my for each loop, or do I need to make this performance trade off?

Upvotes: 0

Views: 1174

Answers (1)

Jason Scott Lenderman
Jason Scott Lenderman

Reputation: 1918

You should generally avoid using Accumulators for anything but debugging because there's no guarantee, as far as I know, that each entry of the RDD will only be "added" into the Accumulator exactly once.

Maybe try something like this:

import scala.collection.mutable.HashSet
import scala.util.Try

val result = srch_destination_id_distinct.flatMap(r => 
  Try((r(0).toString.toInt, r(1).toString.toInt)).toOption
).aggregateByKey(HashSet.empty[Int])(
  (set, n) => set += n,
  (set1, set2) => set1 union set2
).mapValues(_.toSet).collectAsMap

The distinction between seqOp and combOp arguments of the aggregate method also allow us to avoid "wrapping" each element of the RDD in a Map[Int, Set[Int]] in the way you did with your approach.

Upvotes: 3

Related Questions