chenzhongpu
chenzhongpu

Reputation: 6871

How to execute multiple tasks parallellly?

I attend the class Parallel Programming, and it shows the parallel interface:

def parallel[A, B](taskA: => A, taskB: => B): (A, B) = {
  val ta = taskA
  val tb = task {taskB}
  (ta, tb.join())
}

and the following is wrong:

def parallel[A, B](taskA: => A, taskB: => B): (A, B) = {
  val ta = taskB
  val tb = task {taskB}.join()
  (ta, tb)
}

see the interface more at https://gist.github.com/ChenZhongPu/fe389d30626626294306264a148bd2aa

It also show us the right way to execute four tasks:

def parallel[A, B, C, D](taskA: => A, taskB: => B, taskC: => C, taskD: => D): (A, B, C, D) = {
    val ta = task { taskA }
    val tb = task { taskB }
    val tc = task { taskC }
    val td = taskD
    (ta.join(), tb.join(), tc.join(), td)
}

My question: if I don't know the number of tasks advance (a List of tasks), how can I call join for each tasks correctly?

tasks.map(_.join()) // wrong

Edit

The similar discussion also occurs at Discuss this week's module: Parallel Programming

Upvotes: 6

Views: 7462

Answers (3)

Paul
Paul

Reputation: 27413

Looking around for a practical way to build parallel() I found it can be built from Future. The paradigm will seem familiar to anyone using modern Javascript Promises:

import scala.concurrent.{Await,Future}
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global

def parallel[A, B](taskA: =>A, taskB: =>B): (A,B) = {
  val fB:Future[B] = Future { taskB }
  val a:A = taskA
  val b:B = Await.result(fB, Duration.Inf)
  (a,b)
}

This spins off taskB to it own thread and does taskA in the main thread. We do taskA and wait, forever if necessary, for fB to finish. Beware I haven't tested exceptions with this setup and it might stall or misbehave.

Upvotes: 1

TeWu
TeWu

Reputation: 6526

Using framework from Parallel Programming course

You can implement the method like this:

def parallel[A](tasks: (() => A)*): Seq[A] = {
  if (tasks.isEmpty) Nil
  else {
    val pendingTasks = tasks.tail.map(t => task { t() })
    tasks.head() +: pendingTasks.map(_.join())
  }
}

(Note that you can't have variable number of by-name arguments - though this can change)

And then use it like that:

object ParallelUsage {
  def main(args: Array[String]) {
    val start = System.currentTimeMillis()

    // Use a list of tasks:
    val tasks = List(longTask _, longTask _, longTask _, longTask _)
    val results = parallel(tasks: _*)
    println(results)

    // or pass any number of individual tasks directly:
    println(parallel(longTask, longTask, longTask))
    println(parallel(longTask, longTask))
    println(parallel(longTask))
    println(parallel())

    println(s"Done in ${ System.currentTimeMillis() - start } ms")
  }

  def longTask() = {
    println("starting longTask execution")
    Thread.sleep(1000)
    42 + Math.random
  }
}

Using Scala's parallel collections

You can't go simpler than this:

val tasks = Vector(longTask _, longTask _, longTask _)
val results = tasks.par.map(_()).seq

Upvotes: 3

flavian
flavian

Reputation: 28511

Inspired by Future.sequence and cheating a bit. You need a Task implementation that's also a Monad to make this design work.

  /** Transforms a `TraversableOnce[Task[A]]` into a `Task[TraversableOnce[A]]`.
   *  Useful for reducing many `Task`s into a single `Task`.
   */
  def parallel[
    A,
    M[X] <: TraversableOnce[X]
  ](in: M[Task[A]])(
    implicit cbf: CanBuildFrom[M[Task[A]], A, M[A]],
    executor: ExecutionContext
  ): Task[M[A]] = {
    in.foldLeft(Task.point(cbf(in))) {
      (fr, fa) => for (r <- fr; a <- fa) yield (r += a)
    }.map(_.result())(executor)
  }

This can execute operations in parallel for most Scala collections, the only condition is that the Task defines map and flatMap, whichever the implementation is, because you can abstract over the particular collection type using the implicit builder construct, that's internal to the Scala library.

Upvotes: 0

Related Questions