Ralph
Ralph

Reputation: 32284

Multiple Scala actors servicing one task

I need to process multiple data values in parallel ("SIMD"). I can use the java.util.concurrent APIs (Executors.newFixedThreadPool()) to process several values in parallels using Future instances:

import java.util.concurrent.{Executors, Callable}

class ExecutorsTest {
  private class Process(value: Int)
      extends Callable[Int] {
    def call(): Int = {
      // Do some time-consuming task
      value
    }
  }

  val executorService = {
    val threads = Runtime.getRuntime.availableProcessors
    Executors.newFixedThreadPool(threads)
  }

  val processes = for (process <- 1 to 1000) yield new Process(process)

  val futures = executorService.invokeAll(processes)

  // Wait for futures
}

How do I do the same thing using Actors? I do not believe that I want to "feed" all of the processes to a single actor because the actor will then execute them sequentially.

Do I need to create multiple "processor" actors with a "dispatcher" actor that sends an equal number of processes to each "processor" actor?

Upvotes: 2

Views: 950

Answers (2)

Rex Kerr
Rex Kerr

Reputation: 167891

If you just want fire-and-forget processing, why not use Scala futures?

import scala.actors.Futures._
def example = {
  val answers = (1 to 4).map(x => future {
    Thread.sleep(x*1000)
    println("Slept for "+x)
    x
  })
  val t0 = System.nanoTime
  awaitAll(1000000,answers: _*)  // Number is timeout in ms
  val t1 = System.nanoTime
  printf("%.3f seconds elapsed\n",(t1-t0)*1e-9)
  answers.map(_()).sum
}

scala> example
Slept for 1
Slept for 2
Slept for 3
Slept for 4
4.000 seconds elapsed
res1: Int = 10

Basically, all you do is put the code you want inside a future { } block, and it will immediately return a future; apply it to get the answer (it will block until done), or use awaitAll with a timeout to wait until everyone is done.


Update: As of 2.11, the way to do this is with scala.concurrent.Future. A translation of the above code is:

import scala.concurrent._
import duration._
import ExecutionContext.Implicits.global

def example = {
  val answers = Future.sequence(
    (1 to 4).map(x => Future {
      Thread.sleep(x*1000)
      println("Slept for "+x)
      x
    })
  )
  val t0 = System.nanoTime
  val completed = Await.result(answers, Duration(1000, SECONDS))
  val t1 = System.nanoTime
  printf("%.3f seconds elapsed\n",(t1-t0)*1e-9)
  completed.sum
}

Upvotes: 10

mpilquist
mpilquist

Reputation: 3855

If you can use Akka, take a look at the ActorPool support: http://doc.akka.io/routing-scala

It lets you specify parameters about how many actors you want running in parallel and then dispatches work to those actors.

Upvotes: 3

Related Questions