mitchus
mitchus

Reputation: 4877

Count occurrences of each item in a Scala parallel collection

My question is very similar to Count occurrences of each element in a List[List[T]] in Scala, except that I would like to have an efficient solution involving parallel collections.

Specifically, I have a large (~10^7) vector vec of short (~10) lists of Ints, and I would like to get for each Int x the number of times x occurs, for example as a Map[Int,Int]. The number of distinct integers is of the order 10^6.

Since the machine this needs to be done on has a fair amount of memory (150GB) and number of cores (>100) it seems like parallel collections would be a good choice for this. Is the code below a good approach?

val flatpvec = vec.par.flatten
val flatvec = flatpvec.seq
val unique = flatpvec.distinct
val counts = unique map (x => (x -> flatvec.count(_ == x)))
counts.toMap

Or are there better solutions? In case you are wondering about the .seq conversion: for some reason the following code doesn't seem to terminate, even for small examples:

val flatpvec = vec.par.flatten
val unique = flatpvec.distinct
val counts = unique map (x => (x -> flatpvec.count(_ == x)))
counts.toMap

Upvotes: 4

Views: 1429

Answers (2)

som-snytt
som-snytt

Reputation: 39577

This does something. aggregate is like fold except you also combine the results of the sequential folds.

Update: It's not surprising that there is overhead in .par.groupBy, but I was surprised by the constant factor. By these numbers, you would never count that way. Also, I had to bump the memory way up.

The interesting technique used to build the result map is described in this paper linked from the overview. (It cleverly saves the intermediate results and then coalesces them in parallel at the end.)

But copying around the intermediate results of the groupBy turns out to be expensive, if all you really want is a count.

The numbers are comparing sequential groupBy, parallel, and finally aggregate.

apm@mara:~/tmp$ scalacm countints.scala ; scalam -J-Xms8g -J-Xmx8g -J-Xss1m countints.Test
GroupBy: Starting...
Finished in 12695
GroupBy: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))
Par GroupBy: Starting...
Finished in 51481
Par GroupBy: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))
Aggregate: Starting...
Finished in 2672
Aggregate: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))

Nothing magical in the test code.

import collection.GenTraversableOnce
import collection.concurrent.TrieMap
import collection.mutable

import concurrent.duration._

trait Timed {
  def now = System.nanoTime
  def timed[A](op: =>A): A =  {
    val start = now
    val res = op
    val end = now
    val lapsed = (end - start).nanos.toMillis
    Console println s"Finished in $lapsed"
    res
  }
  def showtime(title: String, op: =>GenTraversableOnce[(Int,Int)]): Unit = {
    Console println s"$title: Starting..."
    val res = timed(op)
    //val showable = res.toIterator.min   //(res.toIterator take 10).toList
    val showable = res.toList.sorted take 10
    Console println s"$title: $showable"
  }
}

It generates some random data for interest.

object Test extends App with Timed {

  val upto = math.pow(10,6).toInt
  val ran = new java.util.Random
  val ten = (1 to 10).toList
  val maxSamples = 1000
  // samples of ten random numbers in the desired range
  val samples = (1 to maxSamples).toList map (_ => ten map (_ => ran nextInt upto))
  // pick a sample at random
  def anyten = samples(ran nextInt maxSamples)
  def mag = 7
  val data: Vector[List[Int]] = Vector.fill(math.pow(10,mag).toInt)(anyten)

The sequential operation and the combining operation of aggregate are invoked from a task, and the result is assigned to a volatile var.

  def z: mutable.Map[Int,Int] = mutable.Map.empty[Int,Int]
  def so(m: mutable.Map[Int,Int], is: List[Int]) = {
    for (i <- is) {
      val v = m.getOrElse(i, 0)
      m(i) = v + 1
    }
    m
  }
  def co(m: mutable.Map[Int,Int], n: mutable.Map[Int,Int]) = {
    for ((i, count) <- n) {
      val v = m.getOrElse(i, 0)
      m(i) = v + count
    }
    m
  }
  showtime("GroupBy", data.flatten groupBy identity map { case (k, vs) => (k, vs.size) })
  showtime("Par GroupBy", data.flatten.par groupBy identity map { case (k, vs) => (k, vs.size) })
  showtime("Aggregate", data.par.aggregate(z)(so, co))
}

Upvotes: 4

Bj&#246;rn Jacobs
Bj&#246;rn Jacobs

Reputation: 4272

If you want to make use of parallel collections and Scala standard tools, you could do it like that. Group your collection by the identity and then map it to (Value, Count):

scala> val longList = List(1, 5, 2, 3, 7, 4, 2, 3, 7, 3, 2, 1, 7)
longList: List[Int] = List(1, 5, 2, 3, 7, 4, 2, 3, 7, 3, 2, 1, 7)                                                                                            

scala> longList.par.groupBy(x => x)
res0: scala.collection.parallel.immutable.ParMap[Int,scala.collection.parallel.immutable.ParSeq[Int]] = ParMap(5 -> ParVector(5), 1 -> ParVector(1, 1), 2 -> ParVector(2, 2, 2), 7 -> ParVector(7, 7, 7), 3 -> ParVector(3, 3, 3), 4 -> ParVector(4))                                                                     

scala> longList.par.groupBy(x => x).map(x => (x._1, x._2.size))
res1: scala.collection.parallel.immutable.ParMap[Int,Int] = ParMap(5 -> 1, 1 -> 2, 2 -> 3, 7 -> 3, 3 -> 3, 4 -> 1)                                           

Or even nicer like pagoda_5b suggested in the comments:

scala> longList.par.groupBy(identity).mapValues(_.size)
res1: scala.collection.parallel.ParMap[Int,Int] = ParMap(5 -> 1, 1 -> 2, 2 -> 3, 7 -> 3, 3 -> 3, 4 -> 1)

Upvotes: 2

Related Questions