Reputation: 10577
I have an iteration vals: Iterable[T]
and a long-running function without any relevant side effects: f: (T => Unit)
. Right now this is applied to vals
in the obvious way:
vals.foreach(f)
I would like the calls to f
to be done concurrently (within reasonable limits). Is there an obvious function somewhere in the Scala base library? Something like:
Concurrent.foreach(8 /* Number of threads. */)(vals, f)
While f
is reasonably long running, it is short enough that I don't want the overhead of invoking a thread for each call, so I am looking for something based on a thread pool.
Upvotes: 17
Views: 17944
Reputation: 431
Many of the answers from 2009 still use the old scala.actors.Futures._, which are no longer in the newer Scala. While Akka is the preferred way, a much more readable way is to just use parallel (.par) collections:
vals.foreach { v => f(v) }
becomes
vals.par.foreach { v => f(v) }
Alternatively, using parMap can appear more succinct though with the caveat that you need to remember to import the usual Scalaz*. As usual, there's more than one way to do the same thing in Scala!
Upvotes: 24
Reputation: 21
You can use the Parallel Collections from the Scala standard library.
They're just like ordinary collections, but their operations run in parallel. You just need to put a par
call before you invoke some collections operation.
import scala.collection._
val array = new Array[String](10000)
for (i <- (0 until 10000).par) array(i) = i.toString
Upvotes: 0
Reputation: 35054
Scalaz has parMap
. You would use it as follows:
import scalaz.Scalaz._
import scalaz.concurrent.Strategy.Naive
This will equip every functor (including Iterable
) with a parMap
method, so you can just do:
vals.parMap(f)
You also get parFlatMap
, parZipWith
, etc.
Upvotes: 13
Reputation: 35054
The latest release of Functional Java has some higher-order concurrency features that you can use.
import fjs.F._
import fj.control.parallel.Strategy._
import fj.control.parallel.ParModule._
import java.util.concurrent.Executors._
val pool = newCachedThreadPool
val par = parModule(executorStrategy[Unit](pool))
And then...
par.parMap(vals, f)
Remember to shutdown
the pool
.
Upvotes: 2
Reputation: 55115
I like the Futures
answer. However, while it will execute concurrently, it will also return asynchronously, which is probably not what you want. The correct approach would be as follows:
import scala.actors.Futures._
vals map { x => future { f(x) } } foreach { _() }
Upvotes: 10
Reputation: 297155
I'd use scala.actors.Futures
:
vals.foreach(t => scala.actors.Futures.future(f(t)))
Upvotes: 2
Reputation: 9301
I had some issues using scala.actors.Futures in Scala 2.8 (it was buggy when I checked). Using java libs directly worked for me, though:
final object Parallel {
val cpus=java.lang.Runtime.getRuntime().availableProcessors
import java.util.{Timer,TimerTask}
def afterDelay(ms: Long)(op: =>Unit) = new Timer().schedule(new TimerTask {override def run = op},ms)
def repeat(n: Int,f: Int=>Unit) = {
import java.util.concurrent._
val e=Executors.newCachedThreadPool //newFixedThreadPool(cpus+1)
(0 until n).foreach(i=>e.execute(new Runnable {def run = f(i)}))
e.shutdown
e.awaitTermination(Math.MAX_LONG, TimeUnit.SECONDS)
}
}
Upvotes: 3