Avba
Avba

Reputation: 15296

How to perform future of futures in parallel and wait for them to complete (running in parallel)

I want to perform "flows" where each flow is performed in parallel. Each flow in itself performs operations using futures:

def doFlow(...): Seq[Future[Something]] = {
  (1 to 10) map {
   Future {
      Something(...)
   }
  }
}


val sequence: Seq[Seq[Future[Something]]] = (1 to 10) map {
  iter => doFlow(...)
}

// now I want to wait for all of them to complete:

val flat: Seq[Future[Something]] = sequence.flatten

val futureSeq = Future.sequence(flat)

futureSeq.onComplete {
   ...
   case Success(val) => {...}
}

I am printing a log of the completions and I see that they are running sequentially rather than in parallel like I want it to

=======================
First started at Wed Apr 18 12:02:22 IDT 2018
Last ended at Wed Apr 18 12:02:28 IDT 2018
Took 4.815 seconds
=======================
First started at Wed Apr 18 12:02:28 IDT 2018
Last ended at Wed Apr 18 12:02:35 IDT 2018
Took 4.335 seconds
=======================
First started at Wed Apr 18 12:02:35 IDT 2018
Last ended at Wed Apr 18 12:02:41 IDT 2018
Took 3.83 seconds
...
...

Upvotes: 0

Views: 648

Answers (2)

simpadjo
simpadjo

Reputation: 4017

Works on my machine:

  import ExecutionContext.Implicits.global

  def doFlow(chunk: Int): Seq[Future[Int]] = {
    (1 to 5) map { i =>
      Future {
        println(s"--> chunk $chunk idx $i")
        Thread.sleep(1000)
        println(s"<-- chunk $chunk idx $i")
        0
      }
    }
  }


  val sequence: Seq[Seq[Future[Int]]] = (1 to 5) map {
    iter => doFlow(iter)
  }
  val flat: Seq[Future[Int]] = sequence.flatten
  val futureSeq = Future.sequence(flat)
  Await.ready(futureSeq, scala.concurrent.duration.Duration.Inf)

Output sample:

--> chunk 1 idx 2

--> chunk 1 idx 4

--> chunk 1 idx 1

--> chunk 1 idx 3

--> chunk 2 idx 1

--> chunk 1 idx 5

--> chunk 2 idx 3

--> chunk 2 idx 2

<-- chunk 1 idx 2

<-- chunk 2 idx 1

<-- chunk 1 idx 3

--> chunk 2 idx 5

--> chunk 3 idx 1

<-- chunk 1 idx 1

<-- chunk 1 idx 5

<-- chunk 1 idx 4

--> chunk 3 idx 3

--> chunk 2 idx 4

It processes 8 tasks at a time. Do you have any internal synchronization inside Something which can introduce blocking?

Upvotes: 3

Mateusz Kubuszok
Mateusz Kubuszok

Reputation: 27595

Future.sequence(xs.map { x => futureY })

is the way to go.

However, if your future finish immediately, or if ExecutorContext process only 1 of them at a time, they will be effectively sequential.

Your Futures do take time to execute, so I would investigate the ExecutionContext. ExecutionContext.Implicits.global uses as many threads as host has CPUs (so, a single core machine will have ExecutorService with 1 thread).

Defining ExecutorContext for SingleThreadExecutor will also result in running things sequentially.

Then there is also a possibility of blocking inside a Future. Or tracing wrong things.

To figure out more we would have to look what Something(...) does and what is the ExecutorContext you use.

Upvotes: 0

Related Questions