k0pernikus
k0pernikus

Reputation: 66500

How to control the concurrency of future.sequence in scala?

I know that I can transform a Seq[Future[T]] into a Future[Seq[T]] via

  val seqFuture = Future.sequence(seqOfFutures)
  seqFuture.map((seqT: Seq[T]) => {...})

My problem now is, that I have 700 futures in that sequence and I want to be able to control how many of them are resolved in parallel as each future will call an internal rest api, and having 700 requests at the same time is like fireing a dos-attack against that server.

I rather only have something like 10 futures being resolved at a time.

How can I achieve that?


Trying pamu's answer I see the error:

[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:44: com.dreamlines.commons.LazyFuture[A] does not take parameters
[error]         val batch = Future.sequence(c.map(_()))
[error]                                            ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:28: no type parameters for method sequence: (in: M[scala.concurrent.Future[A]])(implicit cbf: scala.collection.generic.CanBuildFrom[M[scala.concurrent.Future[A]],A,M[A]], implicit executor: scala.concurrent.ExecutionContext)scala.concurrent.Future[M[A]] exist so that it can be applied to arguments (List[Nothing])
[error]  --- because ---
[error] argument expression's type is not compatible with formal parameter type;
[error]  found   : List[Nothing]
[error]  required: ?M[scala.concurrent.Future[?A]]
[error]         val batch = Future.sequence(c.map(_()))
[error]                            ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:42: type mismatch;
[error]  found   : List[Nothing]
[error]  required: M[scala.concurrent.Future[A]]
[error]         val batch = Future.sequence(c.map(_()))
[error]                                          ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:36: Cannot construct a collection of type M[A] with elements of type A based on a collection of type M[scala.concurrent.Future[A]].
[error]         val batch = Future.sequence(c.map(_()))
[error]                                    ^
[error] four errors found

Upvotes: 12

Views: 4323

Answers (2)

Nagarjuna Pamu
Nagarjuna Pamu

Reputation: 14825

FoldLeft

Simple foldLeft can be used to control the number of futures that run concurrently at a time.

First, let's create a case class called LazyFuture

case class LazyFuture[+A](f: Unit => Future[A]) {
  def apply() = f()
}

object LazyFuture {
  def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))

  def apply[A](f: => Future[A])(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => f)
}

LazyFuture stops future from running immediately

val list: List[LazyFuture[A]] = ...


list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])){ (r, c) =>
  val batch = Future.sequence(c.map(_()))
  batch.flatMap(values => r.map(rs => rs ++ values))
}

Change concurFactor accordingly to run multiple futures concurrently.

concurFactor of 1 will run one future at once

concurFactor of 2 will run two futures at once

and so on ...

def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int) =
   list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])){ (r, c) =>
      val batch = Future.sequence(c.map(_()))
      r.flatMap(rs => batch.map(values => rs ++ values))
    }

Complete code

  case class LazyFuture[+A](f: Unit => Future[A]) {
    def apply() = f()
  }

  object LazyFuture {
    def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))

    def apply[A](f: => Future[A])(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => f)
  }

  def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int)(implicit ec: ExecutionContext): Future[List[A]] =
    list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])) { (r, c) =>
      val batch = Future.sequence(c.map(_ ()))
      r.flatMap(rs => batch.map(values => rs ++ values))
    }

Limiting the execution context

You can also limit the computation resources by limiting the number of threads in the execution pool. But, this solution is not so flexible. Personally, I do not like it.

val context: ExecutionContext = 
  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))

You have to remember to pass correct execution context which is an implicit value. Sometimes we do not know which implicit is in scope. It's buggy

Warning

When future is constructed like below

val foo = Future {
     1 + 2
} // future starts executing

LazyFuture(foo) // Not a right way

foo already started executing and cannot be controlled.

Right way to construct LazyFuture

val foo = LazyFuture {
  1 + 2
}

or

val foo = LazyFuture {
  Future {
   1 + 2
  }
}

Working example

package main

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration

object Main {

  case class LazyFuture[A](f: Unit => Future[A]) {
    def apply(): Future[A] = f()
  }

  object LazyFuture {
    def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))
    def apply[A](f: => Future[A]): LazyFuture[A] = LazyFuture(_ => f)
  }

  def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int)
    (implicit ec: ExecutionContext): Future[List[A]] =
    list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])) { (r, c) =>
      val batch = Future.sequence(c.map(_ ()))
      r.flatMap(rs => r.map(values=> rs ++ values))
    }

  def main(args: Array[String]): Unit = {
    import scala.concurrent.ExecutionContext.Implicits.global


    val futures: Seq[LazyFuture[Int]] = List(1, 2, 3, 4, 5).map { value =>
      LazyFuture {
        println(s"value: $value started")
        Thread.sleep(value * 200)
        println(s"value: $value stopped")
        value
      }
    }
    val f = executeBatch(futures.toList)(2)
    Await.result(f, Duration.Inf)
  }

}

Upvotes: 7

Kolmar
Kolmar

Reputation: 14224

Concurrency is of Scala Futures is controlled by the ExecutionContext. Note that futures start executing on the context immediately after creation, so the ExecutionContext of Future.sequence doesn't really matter. You have to supply the appropriate context when creating the original futures from the sequence.

The default context ExecutionContext.global (usually imported through import scala.concurrent.ExecutionContext.Implicits.global) uses as many threads as there are processor cores, but it can also create many additional threads for blocking tasks, that are wrapped in scala.concurrent.blocking. This is usually the desired behaviour, but it's not suitable for your problem.

Fortunately, you can use ExecutionContext.fromExecutor method to wrap a Java thread pool. For example:

import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext

val context = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
val seqOfFutures = Seq.fill(700)(Future { callRestApi() }(context))
val sequenceFuture = Future.sequence(seqOfFutures)(ExecutionContext.global)

The context can also be provided implicitly of course:

implicit val context: ExecutionContext = 
  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
val seqOfFutures = Seq.fill(700)(Future { callRestApi() })
// This `sequence` uses the same thread pool as the original futures
val sequenceFuture = Future.sequence(seqOfFutures) 

Upvotes: 4

Related Questions