Roberto A.
Roberto A.

Reputation: 402

Why are Scala Futures running only 2 at a time?

I am trying to use Scala Futures to run 50+ tasks at the same time. Unfortunately though I am only getting them to run 2 at a time. Can somebody tell me what am I doing wrong or how to increase parallelism?

import ExecutionContext.Implicits.global
import scala.concurrent._
import scala.concurrent.duration._

object Test {
  def main(args: Array[String]) {
    def go() = {
      val list = Seq(
          Future { println("start 1"); Thread.sleep(1000); println("stop 1")}, 
          Future { println("start 2"); Thread.sleep(2000); println("stop 2")},
          Future { println("start 3"); Thread.sleep(3000); println("stop 3")},
          Future { println("start 4"); Thread.sleep(4000); println("stop 4")},
          Future { println("start 5"); Thread.sleep(5000); println("stop 5")}
      )
      Future.sequence(list)
    }
    Await.result(go, Duration.Inf)
  }
}

The result I get is

start 1
start 2  
stop 1  
start 3  
stop 2  
start 4  
stop 3  
start 5  
stop 4  
stop 5

Why am I not getting the following?

start 1
start 2
start 3
start 4
start 5
stop 1
stop 2
stop 3
stop 4
stop 5

Upvotes: 3

Views: 1132

Answers (2)

chad
chad

Reputation: 7519

As as parallel exeuction construct, a Scala Future needs some sort of ExecutionContext backing it; typically, this ExecutionContext has a pool of threads from which it can draw to execute your future logic on.

The most common way of making an ExecutionEnvironment avaiable is to bring an implicitly defined thread pool into scope, so it will be passed in to the Future construction logic. This is done by importing the default defintion, like this:

import scala.concurrent.ExecutionContext.implicits.global

As, you will see on the api documentation, linked above, this default thread pool sets the number of available threads to the number of processor cores, a.k.a.

parallelism level == Runtime.availableProcessors

Upvotes: 9

Roberto A.
Roberto A.

Reputation: 402

got it! thank you both

import java.util.concurrent.Executors
import scala.concurrent._
import scala.concurrent.duration._

object Test {
  def main(args: Array[String]) {
    val executorService  = Executors.newFixedThreadPool(1000)
    val executionContext = ExecutionContext.fromExecutorService(executorService)

    def go(implicit ec: ExecutionContext) = {
      val list = Seq(
          Future { println("start 1"); Thread.sleep(1000); println("stop 1")}, 
          Future { println("start 2"); Thread.sleep(2000); println("stop 2")},
          Future { println("start 3"); Thread.sleep(3000); println("stop 3")},
          Future { println("start 4"); Thread.sleep(4000); println("stop 4")},
          Future { println("start 5"); Thread.sleep(5000); println("stop 5")}
      )
      Future.sequence(list)
    }
    Await.result(go(executionContext), Duration.Inf)
  }
}

Upvotes: 1

Related Questions