Reputation: 1702
When i run this code:
val intList = List(1, 6, 8)
val listOfFutures = intList.map{
i => future {
println("future " +i)
Thread.sleep(i* 1000)
if (i == 12)
throw new Exception("6 is not legal.")
i
}
}
val futureOfList = Future.sequence(listOfFutures)
futureOfList onSuccess{
case int => {
logInfo("onSuccess")
int.foreach(i => {
Future{
logInfo("h : " + i)
}
})
}
}
futureOfList onFailure{
case int => {
logInfo("onFailure")
}
}
logInfo("after")
The nested futures run all with the same thread:
future 1
2013/11/24 11:55:00.002 [ModelUtilsTest] [main]: INFO: [] after
future 6
future 8
2013/11/24 11:55:14.878 [ModelUtilsTest] [ForkJoinPool-1-worker-1]: INFO: [] onSuccess
2013/11/24 11:55:15.378 [ModelUtilsTest] [ForkJoinPool-1-worker-1]: INFO: [] h : 1
2013/11/24 11:55:15.378 [ModelUtilsTest] [ForkJoinPool-1-worker-1]: INFO: [] h : 6
2013/11/24 11:55:15.378 [ModelUtilsTest] [ForkJoinPool-1-worker-1]: INFO: [] h : 8
Why it is happen? i need the inner future to run in parallel.
Upvotes: 3
Views: 1431
Reputation: 39587
The Future
implementation checks whether it is going to run something on the ForkJoinPool
that owns the current thread. In that case, it will ForkJoinTask.fork()
, which adds the new work unit to the work queue of the current worker thread. Any other thread has to steal that work to execute it. You wouldn't expect to see a uniform distribution of your tasks over the available threads, especially with trivial tasks.
In the following run, worker-9
bears the brunt:
scala> val f = Future sequence (1 to 20 map (i => Future(i)))
f: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] = scala.concurrent.impl.Promise$DefaultPromise@47fe4ff8
scala> f onSuccess { case is => is foreach (i => Future(println(s"$i on ${Thread.currentThread}"))) }
scala> 2 on Thread[ForkJoinPool-1-worker-9,5,main]
5 on Thread[ForkJoinPool-1-worker-15,5,main]
4 on Thread[ForkJoinPool-1-worker-3,5,main]
6 on Thread[ForkJoinPool-1-worker-9,5,main]
1 on Thread[ForkJoinPool-1-worker-5,5,main]
12 on Thread[ForkJoinPool-1-worker-7,5,main]
11 on Thread[ForkJoinPool-1-worker-9,5,main]
15 on Thread[ForkJoinPool-1-worker-9,5,main]
16 on Thread[ForkJoinPool-1-worker-9,5,main]
17 on Thread[ForkJoinPool-1-worker-9,5,main]
18 on Thread[ForkJoinPool-1-worker-9,5,main]
19 on Thread[ForkJoinPool-1-worker-9,5,main]
20 on Thread[ForkJoinPool-1-worker-9,5,main]
3 on Thread[ForkJoinPool-1-worker-1,5,main]
10 on Thread[ForkJoinPool-1-worker-13,5,main]
9 on Thread[ForkJoinPool-1-worker-11,5,main]
8 on Thread[ForkJoinPool-1-worker-3,5,main]
7 on Thread[ForkJoinPool-1-worker-15,5,main]
14 on Thread[ForkJoinPool-1-worker-7,5,main]
13 on Thread[ForkJoinPool-1-worker-5,5,main]
Upvotes: 2
Reputation: 28511
You have two ways to achieve parallelism easily:
First, you can use the traverse
method from the Future
companion object.
import scala.collection.breakOut
import scala.concurrent.Future;
import scala.concurrent.ExecutionContext.Implicits.global
val modifiedList = Future.traverse(myList)(myFunc)
Or you can use par
collections, which run the mapping in parallel.
val modifiedList = myList.par.map(myFunc)(breakOut)
Upvotes: 1