Reputation: 15296
I want to perform "flows" where each flow is performed in parallel. Each flow in itself performs operations using futures:
def doFlow(...): Seq[Future[Something]] = {
(1 to 10) map {
Future {
Something(...)
}
}
}
val sequence: Seq[Seq[Future[Something]]] = (1 to 10) map {
iter => doFlow(...)
}
// now I want to wait for all of them to complete:
val flat: Seq[Future[Something]] = sequence.flatten
val futureSeq = Future.sequence(flat)
futureSeq.onComplete {
...
case Success(val) => {...}
}
I am printing a log of the completions and I see that they are running sequentially rather than in parallel like I want it to
=======================
First started at Wed Apr 18 12:02:22 IDT 2018
Last ended at Wed Apr 18 12:02:28 IDT 2018
Took 4.815 seconds
=======================
First started at Wed Apr 18 12:02:28 IDT 2018
Last ended at Wed Apr 18 12:02:35 IDT 2018
Took 4.335 seconds
=======================
First started at Wed Apr 18 12:02:35 IDT 2018
Last ended at Wed Apr 18 12:02:41 IDT 2018
Took 3.83 seconds
...
...
Upvotes: 0
Views: 648
Reputation: 4017
Works on my machine:
import ExecutionContext.Implicits.global
def doFlow(chunk: Int): Seq[Future[Int]] = {
(1 to 5) map { i =>
Future {
println(s"--> chunk $chunk idx $i")
Thread.sleep(1000)
println(s"<-- chunk $chunk idx $i")
0
}
}
}
val sequence: Seq[Seq[Future[Int]]] = (1 to 5) map {
iter => doFlow(iter)
}
val flat: Seq[Future[Int]] = sequence.flatten
val futureSeq = Future.sequence(flat)
Await.ready(futureSeq, scala.concurrent.duration.Duration.Inf)
Output sample:
--> chunk 1 idx 2
--> chunk 1 idx 4
--> chunk 1 idx 1
--> chunk 1 idx 3
--> chunk 2 idx 1
--> chunk 1 idx 5
--> chunk 2 idx 3
--> chunk 2 idx 2
<-- chunk 1 idx 2
<-- chunk 2 idx 1
<-- chunk 1 idx 3
--> chunk 2 idx 5
--> chunk 3 idx 1
<-- chunk 1 idx 1
<-- chunk 1 idx 5
<-- chunk 1 idx 4
--> chunk 3 idx 3
--> chunk 2 idx 4
It processes 8 tasks at a time.
Do you have any internal synchronization inside Something
which can introduce blocking?
Upvotes: 3
Reputation: 27595
Future.sequence(xs.map { x => futureY })
is the way to go.
However, if your future finish immediately, or if ExecutorContext process only 1 of them at a time, they will be effectively sequential.
Your Futures do take time to execute, so I would investigate the ExecutionContext. ExecutionContext.Implicits.global
uses as many threads as host has CPUs (so, a single core machine will have ExecutorService with 1 thread).
Defining ExecutorContext for SingleThreadExecutor will also result in running things sequentially.
Then there is also a possibility of blocking inside a Future. Or tracing wrong things.
To figure out more we would have to look what Something(...)
does and what is the ExecutorContext you use.
Upvotes: 0