user1312837
user1312837

Reputation: 1298

How to set thread number for the parallel collections?

I can run scala's foreach in parallel like that:

val N = 100
(0 until N).par.foreach(i => {
   // do something
})

But how can I set thread number? I want something like that:

val N = 100
val NThreads = 5
(0 until N).par.foreach(NThreads, i => {
   // do something
})

Upvotes: 19

Views: 9911

Answers (2)

Avseiytsev Dmitriy
Avseiytsev Dmitriy

Reputation: 1160

Official Scala documentation provides a way to change the task support of a parallel collection like this:

import scala.collection.parallel._
val pc = mutable.ParArray(1, 2, 3)
pc.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(2))

Also it is mentioned that

The execution context task support is set to each parallel collection by default, so parallel collections reuse the same fork-join pool as the future API.

It means that you should create single pool and reuse it. This approach causes resource leak:

def calculate(collection: Seq[Int]): Seq[Int] = {
  val parallel = collection.par
  parallel.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(5))
  parallel.map(_ * 2).seq
} 

Right way to do this would be to reuse existing pool:

val taskSupport = new ForkJoinTaskSupport(new ForkJoinPool(5))

def calculate(collection: Seq[Int]): Seq[Int] = {
  val parallel = collection.par
  parallel.tasksupport = taskSupport
  parallel.map(_ * 2).seq
}

Upvotes: 20

curious
curious

Reputation: 2928

Every parallel collection keeps a tasksupport object which keeps a reference to thread pool implementation.

So, you can set the parallelism level through that object by changing the reference of tasksupport object to a new thread pool according to your need. eg:

def f(numOfThread: Int, n: Int) = {
 import scala.collection.parallel._
 val coll = (0 to n).par
 coll.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(numOfThreads))
  coll.foreach(i => {
   // do something
  })
}

f(2, 100)

For more info on configuring parallel collections you can refer http://docs.scala-lang.org/overviews/parallel-collections/configuration.html

Upvotes: 23

Related Questions