xSmorpheusSx
xSmorpheusSx

Reputation: 103

List of Future is not executed in parallel

I don't understand why this code is not executed in parallel.

implicit val ec: ExecutionContext = system.dispatchers.lookup("db-non-blocking")

println(s"Cores ${Runtime.getRuntime().availableProcessors()}")

def print(i: Int): Future[Unit] = Future {
  if (i == 1) {
    Thread.sleep(5000)
  }
  println(s"Finished $i at ${new Date(System.currentTimeMillis())} (Thread ${Thread.currentThread().getName})")
}

Future {
  println(s"${new Date(System.currentTimeMillis())} (Thread ${Thread.currentThread().getName})")
  Seq(1, 2, 3, 4, 5)
}.map(d => {
  d.map(u => print(u))
})

Running this code the output is the following.

Cores 16
Tue Jun 18 14:19:15 CEST 2024 (Thread plugin-system-db-non-blocking-16)
Finished 1 at Tue Jun 18 14:19:20 CEST 2024 (Thread plugin-system-db-non-blocking-16)
Finished 2 at Tue Jun 18 14:19:20 CEST 2024 (Thread plugin-system-db-non-blocking-16)
Finished 3 at Tue Jun 18 14:19:20 CEST 2024 (Thread plugin-system-db-non-blocking-16)
Finished 4 at Tue Jun 18 14:19:20 CEST 2024 (Thread plugin-system-db-non-blocking-16)
Finished 5 at Tue Jun 18 14:19:20 CEST 2024 (Thread plugin-system-db-non-blocking-16)

After the first Future has completed, the map returns a list of Futures. I am confused why the created Futures are not executed asynchronously. All created threads run on the same dispatcher. The pool size of this dispatcher is sufficient to run 5 threads in parallel. It seems, all futures created inside the map function are executed on the same thread.

dispatcher-non-blocking {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    fixed-pool-size = 50
  }
  throughput = 1
}

Why are these Futures not executed in parallel and run on the same thread with id 16? If I run this code with different dispatcher, all future get executed in parallel. If the "Seq" is not wrapped inside a future, all created futures run in parallel but why do they run synchronous if created inside map.

Upvotes: 1

Views: 141

Answers (1)

Andrew Nolan
Andrew Nolan

Reputation: 2107

Seq is processed sequentially. A Future only runs after it is constructed. In this case, using Future.sequence(d.map(u => print(u))) should do the trick.

In scala 2.12 and older,the parallel collection types could work. In 2.13 and 3.x, you will need to add the module to your dependencies.

import scala.collection.parallel.ParSeq

Future {
    println(s"${new Date(System.currentTimeMillis())} (Thread ${Thread.currentThread().getName})")
    ParSeq(1, 2, 3, 4, 5) // <-- ParSeq instead of Seq
  }.map(d => {
    d.map(u => print(u))
  })
Cores 16
Tue Jun 18 12:31:36 EDT 2024 (Thread pool-1-thread-1)
Finished 2 at Tue Jun 18 12:31:36 EDT 2024 (Thread pool-1-thread-4)
Finished 3 at Tue Jun 18 12:31:36 EDT 2024 (Thread pool-1-thread-1)
Finished 4 at Tue Jun 18 12:31:36 EDT 2024 (Thread pool-1-thread-4)
Finished 5 at Tue Jun 18 12:31:36 EDT 2024 (Thread pool-1-thread-1)
Blocking 1 at Tue Jun 18 12:31:36 EDT 2024 (Thread pool-1-thread-3)
Finished 1 after Blocking at Tue Jun 18 12:31:41 EDT 2024 (Thread pool-1-thread-3)
Finished 1 at Tue Jun 18 12:31:41 EDT 2024 (Thread pool-1-thread-3)

Upvotes: 1

Related Questions