David Crawshaw
David Crawshaw

Reputation: 10577

Concurrent map/foreach in scala

I have an iteration vals: Iterable[T] and a long-running function without any relevant side effects: f: (T => Unit). Right now this is applied to vals in the obvious way:

vals.foreach(f)

I would like the calls to f to be done concurrently (within reasonable limits). Is there an obvious function somewhere in the Scala base library? Something like:

Concurrent.foreach(8 /* Number of threads. */)(vals, f)

While f is reasonably long running, it is short enough that I don't want the overhead of invoking a thread for each call, so I am looking for something based on a thread pool.

Upvotes: 17

Views: 17944

Answers (7)

Kei-ven
Kei-ven

Reputation: 431

Many of the answers from 2009 still use the old scala.actors.Futures._, which are no longer in the newer Scala. While Akka is the preferred way, a much more readable way is to just use parallel (.par) collections:

vals.foreach { v => f(v) }

becomes

vals.par.foreach { v => f(v) }

Alternatively, using parMap can appear more succinct though with the caveat that you need to remember to import the usual Scalaz*. As usual, there's more than one way to do the same thing in Scala!

Upvotes: 24

Roland
Roland

Reputation: 21

You can use the Parallel Collections from the Scala standard library. They're just like ordinary collections, but their operations run in parallel. You just need to put a par call before you invoke some collections operation.

import scala.collection._

val array = new Array[String](10000)
for (i <- (0 until 10000).par) array(i) = i.toString

Upvotes: 0

Apocalisp
Apocalisp

Reputation: 35054

Scalaz has parMap. You would use it as follows:

import scalaz.Scalaz._
import scalaz.concurrent.Strategy.Naive

This will equip every functor (including Iterable) with a parMap method, so you can just do:

vals.parMap(f)

You also get parFlatMap, parZipWith, etc.

Upvotes: 13

Apocalisp
Apocalisp

Reputation: 35054

The latest release of Functional Java has some higher-order concurrency features that you can use.

import fjs.F._
import fj.control.parallel.Strategy._
import fj.control.parallel.ParModule._
import java.util.concurrent.Executors._

val pool = newCachedThreadPool
val par = parModule(executorStrategy[Unit](pool))

And then...

par.parMap(vals, f)

Remember to shutdown the pool.

Upvotes: 2

Daniel Spiewak
Daniel Spiewak

Reputation: 55115

I like the Futures answer. However, while it will execute concurrently, it will also return asynchronously, which is probably not what you want. The correct approach would be as follows:

import scala.actors.Futures._

vals map { x => future { f(x) } } foreach { _() }

Upvotes: 10

Daniel C. Sobral
Daniel C. Sobral

Reputation: 297155

I'd use scala.actors.Futures:

vals.foreach(t => scala.actors.Futures.future(f(t)))

Upvotes: 2

Jonathan Graehl
Jonathan Graehl

Reputation: 9301

I had some issues using scala.actors.Futures in Scala 2.8 (it was buggy when I checked). Using java libs directly worked for me, though:

final object Parallel {
  val cpus=java.lang.Runtime.getRuntime().availableProcessors
  import java.util.{Timer,TimerTask}
  def afterDelay(ms: Long)(op: =>Unit) = new Timer().schedule(new TimerTask {override def run = op},ms)
  def repeat(n: Int,f: Int=>Unit) = {
    import java.util.concurrent._
    val e=Executors.newCachedThreadPool //newFixedThreadPool(cpus+1)
    (0 until n).foreach(i=>e.execute(new Runnable {def run = f(i)}))
    e.shutdown
    e.awaitTermination(Math.MAX_LONG, TimeUnit.SECONDS)
  }
}

Upvotes: 3

Related Questions