Reputation: 4877
My question is very similar to Count occurrences of each element in a List[List[T]] in Scala, except that I would like to have an efficient solution involving parallel collections.
Specifically, I have a large (~10^7) vector vec
of short (~10) lists of Ints, and I would like to get for each Int x
the number of times x
occurs, for example as a Map[Int,Int]
. The number of distinct integers is of the order 10^6.
Since the machine this needs to be done on has a fair amount of memory (150GB) and number of cores (>100) it seems like parallel collections would be a good choice for this. Is the code below a good approach?
val flatpvec = vec.par.flatten
val flatvec = flatpvec.seq
val unique = flatpvec.distinct
val counts = unique map (x => (x -> flatvec.count(_ == x)))
counts.toMap
Or are there better solutions? In case you are wondering about the .seq conversion: for some reason the following code doesn't seem to terminate, even for small examples:
val flatpvec = vec.par.flatten
val unique = flatpvec.distinct
val counts = unique map (x => (x -> flatpvec.count(_ == x)))
counts.toMap
Upvotes: 4
Views: 1429
Reputation: 39577
This does something. aggregate
is like fold
except you also combine the results of the sequential folds.
Update: It's not surprising that there is overhead in .par.groupBy
, but I was surprised by the constant factor. By these numbers, you would never count that way. Also, I had to bump the memory way up.
The interesting technique used to build the result map is described in this paper linked from the overview. (It cleverly saves the intermediate results and then coalesces them in parallel at the end.)
But copying around the intermediate results of the groupBy
turns out to be expensive, if all you really want is a count.
The numbers are comparing sequential groupBy
, parallel, and finally aggregate
.
apm@mara:~/tmp$ scalacm countints.scala ; scalam -J-Xms8g -J-Xmx8g -J-Xss1m countints.Test
GroupBy: Starting...
Finished in 12695
GroupBy: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))
Par GroupBy: Starting...
Finished in 51481
Par GroupBy: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))
Aggregate: Starting...
Finished in 2672
Aggregate: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))
Nothing magical in the test code.
import collection.GenTraversableOnce
import collection.concurrent.TrieMap
import collection.mutable
import concurrent.duration._
trait Timed {
def now = System.nanoTime
def timed[A](op: =>A): A = {
val start = now
val res = op
val end = now
val lapsed = (end - start).nanos.toMillis
Console println s"Finished in $lapsed"
res
}
def showtime(title: String, op: =>GenTraversableOnce[(Int,Int)]): Unit = {
Console println s"$title: Starting..."
val res = timed(op)
//val showable = res.toIterator.min //(res.toIterator take 10).toList
val showable = res.toList.sorted take 10
Console println s"$title: $showable"
}
}
It generates some random data for interest.
object Test extends App with Timed {
val upto = math.pow(10,6).toInt
val ran = new java.util.Random
val ten = (1 to 10).toList
val maxSamples = 1000
// samples of ten random numbers in the desired range
val samples = (1 to maxSamples).toList map (_ => ten map (_ => ran nextInt upto))
// pick a sample at random
def anyten = samples(ran nextInt maxSamples)
def mag = 7
val data: Vector[List[Int]] = Vector.fill(math.pow(10,mag).toInt)(anyten)
The sequential operation and the combining operation of aggregate
are invoked from a task, and the result is assigned to a volatile var.
def z: mutable.Map[Int,Int] = mutable.Map.empty[Int,Int]
def so(m: mutable.Map[Int,Int], is: List[Int]) = {
for (i <- is) {
val v = m.getOrElse(i, 0)
m(i) = v + 1
}
m
}
def co(m: mutable.Map[Int,Int], n: mutable.Map[Int,Int]) = {
for ((i, count) <- n) {
val v = m.getOrElse(i, 0)
m(i) = v + count
}
m
}
showtime("GroupBy", data.flatten groupBy identity map { case (k, vs) => (k, vs.size) })
showtime("Par GroupBy", data.flatten.par groupBy identity map { case (k, vs) => (k, vs.size) })
showtime("Aggregate", data.par.aggregate(z)(so, co))
}
Upvotes: 4
Reputation: 4272
If you want to make use of parallel collections and Scala standard tools, you could do it like that. Group your collection by the identity and then map it to (Value, Count):
scala> val longList = List(1, 5, 2, 3, 7, 4, 2, 3, 7, 3, 2, 1, 7)
longList: List[Int] = List(1, 5, 2, 3, 7, 4, 2, 3, 7, 3, 2, 1, 7)
scala> longList.par.groupBy(x => x)
res0: scala.collection.parallel.immutable.ParMap[Int,scala.collection.parallel.immutable.ParSeq[Int]] = ParMap(5 -> ParVector(5), 1 -> ParVector(1, 1), 2 -> ParVector(2, 2, 2), 7 -> ParVector(7, 7, 7), 3 -> ParVector(3, 3, 3), 4 -> ParVector(4))
scala> longList.par.groupBy(x => x).map(x => (x._1, x._2.size))
res1: scala.collection.parallel.immutable.ParMap[Int,Int] = ParMap(5 -> 1, 1 -> 2, 2 -> 3, 7 -> 3, 3 -> 3, 4 -> 1)
Or even nicer like pagoda_5b suggested in the comments:
scala> longList.par.groupBy(identity).mapValues(_.size)
res1: scala.collection.parallel.ParMap[Int,Int] = ParMap(5 -> 1, 1 -> 2, 2 -> 3, 7 -> 3, 3 -> 3, 4 -> 1)
Upvotes: 2