igreenfield
igreenfield

Reputation: 1702

Future in callback run in the same thread

When i run this code:

val intList = List(1, 6, 8)

val listOfFutures = intList.map{
  i => future {
    println("future " +i)
    Thread.sleep(i* 1000)
    if (i == 12)
      throw new Exception("6 is not legal.")
    i
  }

}
val futureOfList = Future.sequence(listOfFutures)

futureOfList onSuccess{
  case int => {
    logInfo("onSuccess")
    int.foreach(i => {
      Future{
      logInfo("h : " + i)
      }
    })
  }
}

futureOfList onFailure{
  case int => {
    logInfo("onFailure")
  }
}

logInfo("after")

The nested futures run all with the same thread:

future 1
2013/11/24 11:55:00.002 [ModelUtilsTest] [main]: INFO: [] after
future 6
future 8
2013/11/24 11:55:14.878 [ModelUtilsTest] [ForkJoinPool-1-worker-1]: INFO: [] onSuccess 
2013/11/24 11:55:15.378 [ModelUtilsTest] [ForkJoinPool-1-worker-1]: INFO: [] h : 1 
2013/11/24 11:55:15.378 [ModelUtilsTest] [ForkJoinPool-1-worker-1]: INFO: [] h : 6 
2013/11/24 11:55:15.378 [ModelUtilsTest] [ForkJoinPool-1-worker-1]: INFO: [] h : 8

Why it is happen? i need the inner future to run in parallel.

Upvotes: 3

Views: 1431

Answers (2)

som-snytt
som-snytt

Reputation: 39587

The Future implementation checks whether it is going to run something on the ForkJoinPool that owns the current thread. In that case, it will ForkJoinTask.fork(), which adds the new work unit to the work queue of the current worker thread. Any other thread has to steal that work to execute it. You wouldn't expect to see a uniform distribution of your tasks over the available threads, especially with trivial tasks.

In the following run, worker-9 bears the brunt:

scala> val f = Future sequence (1 to 20 map (i => Future(i)))
f: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] = scala.concurrent.impl.Promise$DefaultPromise@47fe4ff8

scala> f onSuccess { case is => is foreach (i => Future(println(s"$i on ${Thread.currentThread}"))) }

scala> 2 on Thread[ForkJoinPool-1-worker-9,5,main]
5 on Thread[ForkJoinPool-1-worker-15,5,main]
4 on Thread[ForkJoinPool-1-worker-3,5,main]
6 on Thread[ForkJoinPool-1-worker-9,5,main]
1 on Thread[ForkJoinPool-1-worker-5,5,main]
12 on Thread[ForkJoinPool-1-worker-7,5,main]
11 on Thread[ForkJoinPool-1-worker-9,5,main]
15 on Thread[ForkJoinPool-1-worker-9,5,main]
16 on Thread[ForkJoinPool-1-worker-9,5,main]
17 on Thread[ForkJoinPool-1-worker-9,5,main]
18 on Thread[ForkJoinPool-1-worker-9,5,main]
19 on Thread[ForkJoinPool-1-worker-9,5,main]
20 on Thread[ForkJoinPool-1-worker-9,5,main]
3 on Thread[ForkJoinPool-1-worker-1,5,main]
10 on Thread[ForkJoinPool-1-worker-13,5,main]
9 on Thread[ForkJoinPool-1-worker-11,5,main]
8 on Thread[ForkJoinPool-1-worker-3,5,main]
7 on Thread[ForkJoinPool-1-worker-15,5,main]
14 on Thread[ForkJoinPool-1-worker-7,5,main]
13 on Thread[ForkJoinPool-1-worker-5,5,main]

Upvotes: 2

flavian
flavian

Reputation: 28511

You have two ways to achieve parallelism easily:

First, you can use the traverse method from the Future companion object.

import scala.collection.breakOut
import scala.concurrent.Future;
import scala.concurrent.ExecutionContext.Implicits.global

val modifiedList = Future.traverse(myList)(myFunc)

Or you can use par collections, which run the mapping in parallel.

val modifiedList = myList.par.map(myFunc)(breakOut)

Upvotes: 1

Related Questions