Albert
Albert

Reputation: 2115

Scala: Wait for timeout of a sequence of futures and then collect the completed results

Situation:

There are a number of blocking synchronous calls (this is a given which cannot be changed) which can potentially take a long time for which the results need to be aggregated.

Goal:

Make the calls non-blocking, then wait for a max time (ms) and collect all the calls that have succeeded even though some might have failed because they have timed out (so we can degrade functionality on the failed calls).

Current solution:

The solution below works by combining the futures, wait for that one to either finish or timeout and in the case of a NonFatal error (timeout) it uses the completedFutureValues method to extract the futures which completed successfully.

  import scala.concurrent.{Await, Future}
  import scala.util.Random._
  import scala.concurrent.duration._
  import scala.concurrent.ExecutionContext.Implicits.global
  import scala.util.{Failure, Success}
  import scala.util.control.NonFatal

  def potentialLongBlockingHelloWorld(i: Int): String = {Thread.sleep(nextInt(500)); s"hello world $i" }

  // use the same method 3 times, but in reality is different methods (with different types)
  val futureHelloWorld1 = Future(potentialLongBlockingHelloWorld(1))
  val futureHelloWorld2 = Future(potentialLongBlockingHelloWorld(2))
  val futureHelloWorld3 = Future(potentialLongBlockingHelloWorld(3))

  val combinedFuture: Future[(String, String, String)] = for {
    hw1 <- futureHelloWorld1
    hw2 <- futureHelloWorld2
    hw3 <- futureHelloWorld3
  } yield (hw1, hw2, hw3)

  val res = try {
    Await.result(combinedFuture, 250.milliseconds)
  } catch {
    case NonFatal(_) => {
      (
        completedFutureValue(futureHelloWorld1, "fallback hello world 1"),
        completedFutureValue(futureHelloWorld2, "fallback hello world 2"),
        completedFutureValue(futureHelloWorld3, "fallback hello world 3")
      )
    }
  }

  def completedFutureValue[T](future: Future[T], fallback: T): T =
    future.value match {
      case Some(Success(value)) => value
      case Some(Failure(e)) =>
        fallback
      case None =>
        fallback
    }

it will return tuple3 with either the completed future result or the fallback, for example: (hello world,fallback hello world 2,fallback hello world 3)

Although this works, I'm not particularly happy with this.

Question:

How can we improve on this?

Upvotes: 1

Views: 1317

Answers (5)

Albert
Albert

Reputation: 2115

Posting a solution provided by a colleague here which basically does the same as the solution provided in the question, but makes it way more clean.

Using his solution one can write:

(
  Recoverable(futureHelloWorld1, "fallback hello world 1"),
  Recoverable(futureHelloWorld2, "fallback hello world 1"),
  Recoverable(futureHelloWorld3, "fallback hello world 1")
).fallbackAfter(250.milliseconds) {
  case (hw1, hw2, hw3) =>
    // Do something with the results.
    println(hw1.value)
    println(hw2.value)
    println(hw3.value)
}

This works using tuples of futures with fallbacks. The code which makes this possible:

import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, TimeoutException}
import scala.util.Try
import scala.util.control.NonFatal

sealed abstract class FallbackFuture[T] private(private val future: Future[T]) {
  def value: T
}

object FallbackFuture {
  final case class Recoverable[T](future: Future[T], fallback: T) extends FallbackFuture[T](future) {
    override def value: T = {
      if (future.isCompleted) future.value.flatMap(t => t.toOption).getOrElse(fallback)
      else fallback
    }
  }

  object Recoverable {
    def apply[T](fun: => T, fallback: T)(implicit ec: ExecutionContext): FallbackFuture[T] = {
      new Recoverable[T](Future(fun), fallback)
    }
  }

  final case class Irrecoverable[T](future: Future[T]) extends FallbackFuture[T](future) {
    override def value: T = {
      def except = throw new IllegalAccessException("Required future did not compelete before timeout")
      if (future.isCompleted) future.value.flatMap(_.toOption).getOrElse(except)
      else except
    }
  }

  object Irrecoverable {
    def apply[T](fun: => T)(implicit ec: ExecutionContext): FallbackFuture[T] = {
      new Irrecoverable[T](Future(fun))
    }
  }

  object Implicits {
    private val logger = LoggerFactory.getLogger(Implicits.getClass)

    type FF[X] = FallbackFuture[X]

    implicit class Tuple2Ops[V1, V2](t: (FF[V1], FF[V2])) {
      def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2])) => R): R =
        awaitAll(timeout, t) {
          fn(t)
        }
    }

    implicit class Tuple3Ops[V1, V2, V3](t: (FF[V1], FF[V2], FF[V3])) {
      def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3])) => R): R =
        awaitAll(timeout, t) {
          fn(t)
        }
    }

    implicit class Tuple4Ops[V1, V2, V3, V4](t: (FF[V1], FF[V2], FF[V3], FF[V4])) {
      def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4])) => R): R =
        awaitAll(timeout, t) {
          fn(t)
        }
    }

    implicit class Tuple5Ops[V1, V2, V3, V4, V5](t: (FF[V1], FF[V2], FF[V3], FF[V4], FF[V5])) {
      def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4], FF[V5])) => R): R =
        awaitAll(timeout, t) {
          fn(t)
        }
    }

    implicit class Tuple6Ops[V1, V2, V3, V4, V5, V6](t: (FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6])) {
      def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6])) => R): R =
        awaitAll(timeout, t) {
          fn(t)
        }
    }

    implicit class Tuple7Ops[V1, V2, V3, V4, V5, V6, V7](t: (FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7])) {
      def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7])) => R): R =
        awaitAll(timeout, t) {
          fn(t)
        }
    }

    implicit class Tuple8Ops[V1, V2, V3, V4, V5, V6, V7, V8](t: (FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7], FF[V8])) {
      def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7], FF[V8])) => R): R =
        awaitAll(timeout, t) {
          fn(t)
        }
    }

    implicit class Tuple9Ops[V1, V2, V3, V4, V5, V6, V7, V8, V9](t: (FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7], FF[V8], FF[V9])) {
      def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7], FF[V8], FF[V9])) => R): R =
        awaitAll(timeout, t) {
          fn(t)
        }
    }

    implicit class Tuple10Ops[V1, V2, V3, V4, V5, V6, V7, V8, V9, V10](t: (FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7], FF[V8], FF[V9], FF[V10])) {
      def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7], FF[V8], FF[V9], FF[V10])) => R): R =
        awaitAll(timeout, t) {
          fn(t)
        }
    }

    private implicit def toFutures(fallbackFuturesTuple: Product): Seq[Future[Any]] = {
      fallbackFuturesTuple.productIterator.toList
        .map(_.asInstanceOf[FallbackFuture[Any]])
        .map(_.future)
    }

    private def awaitAll[R](timeout: Duration, futureSeq: Seq[Future[Any]])(fn: => R) = {
      Try {
        Await.ready(Future.sequence(futureSeq), timeout)
      } recover {
        case _: TimeoutException => logger.warn("Call timed out")
        case NonFatal(ex) => throw ex
      }
      fn
    }
  }
}

Upvotes: 1

Marko Švaljek
Marko Švaljek

Reputation: 2101

If I also might suggest one approach to this. Idea would be to avoid blocking all together and actually set a timeout on every future. Here is a blog post I find very useful when doing my example, It's kind of old, but gold thing:

https://nami.me/2015/01/20/scala-futures-with-timeout/

One negative point in this is that you might need to add akka to the solution, but then again it's not completely ugly:

  import akka.actor.ActorSystem
  import akka.pattern.after

  import scala.concurrent.ExecutionContext.Implicits.global
  import scala.concurrent.duration.{FiniteDuration, _}
  import scala.concurrent.{Await, Future}
  import scala.util.Random._

  implicit val system = ActorSystem("theSystem")

  implicit class FutureExtensions[T](f: Future[T]) {
    def withTimeout(timeout: => Throwable)(implicit duration: FiniteDuration, system: ActorSystem): Future[T] = {
      Future firstCompletedOf Seq(f, after(duration, system.scheduler)(Future.failed(timeout)))
    }
  }

  def potentialLongBlockingHelloWorld(i: Int): String = {
    Thread.sleep(nextInt(500)); s"hello world $i"
  }

  implicit val timeout: FiniteDuration = 250.milliseconds

  val timeoutException = new TimeoutException("Future timed out!")

  // use the same method 3 times, but in reality is different methods (with different types)
  val futureHelloWorld1 = Future(potentialLongBlockingHelloWorld(1)).withTimeout(timeoutException).recoverWith { case _ ⇒ Future.successful("fallback hello world 1") }
  val futureHelloWorld2 = Future(potentialLongBlockingHelloWorld(2)).withTimeout(timeoutException).recoverWith { case _ ⇒ Future.successful("fallback hello world 2") }
  val futureHelloWorld3 = Future(potentialLongBlockingHelloWorld(3)).withTimeout(timeoutException).recoverWith { case _ ⇒ Future.successful("fallback hello world 3") }

  val results = Seq(futureHelloWorld1, futureHelloWorld2, futureHelloWorld3)

  val combinedFuture = Future.sequence(results)

  // this is just to show what you would have in your future
  // combinedFuture is not blocking anything
  val justToShow = Await.result(combinedFuture, 1.seconds)
  println(justToShow)
  // some of my runs:
  // List(hello world 1, hello world 2, fallback hello world 3)
  // List(fallback hello world 1, fallback hello world 2, hello world 3)

With this approach there's no blocking and you have a timeout on every stage so you can fine tune and adapt to what you really need. The await I'm using is just to show how this works.

Upvotes: 1

Igor Yudnikov
Igor Yudnikov

Reputation: 464

why not to write:

val futures: f1 :: f2 :: f3 :: Nil
val results = futures map { f =>
    Await.result(f, yourTimeOut)
}
results.collect {
    case Success => /* your logic */
}

???

Upvotes: 0

Evgeny
Evgeny

Reputation: 1770

As soon as (as I understand) you are going to block current thread anyway and wait for result synchronously, I would say that easiest solution should be:

import java.util.concurrent.atomic.AtomicReference

import scala.concurrent.{Await, Future}
import scala.util.Random._
import scala.concurrent.ExecutionContext.Implicits.global

def potentialLongBlockingHelloWorld(i: Int): String = {Thread.sleep(nextInt(500)); s"hello world $i" }


// init with fallback
val result1 = new AtomicReference[String]("fallback hello world 1")
val result2 = new AtomicReference[String]("fallback hello world 2")
val result3 = new AtomicReference[String]("fallback hello world 3")

// use the same method 3 times, but in reality is different methods (with different types)
val f1 = Future(potentialLongBlockingHelloWorld(1)).map {res =>
  result1.set(res)
}
val f2 = Future(potentialLongBlockingHelloWorld(2)).map {res =>
  result2.set(res)
}
val f3 = Future(potentialLongBlockingHelloWorld(3)).map {res =>
  result1.set(res)
}

for (i <- 1 to 5 if !(f1.isCompleted && f2.isCompleted && f3.isCompleted)) {
  Thread.sleep(50)
}

(result1.get(), result2.get(), result3.get())

Here, you just introduce results in AtomicReferences, which are updated on future completion, and check results with tick for either all futures are completed or at most 250ms (timeout) with ticks.

Alternatively, you can get Future with timeout implementation from here extend with fallback and timeout and than just use Future.sequence with Await, with guarantee that all Futures will be completed in-time with success or fallback.

Upvotes: 0

Igor Yudnikov
Igor Yudnikov

Reputation: 464

Probably it's better to use Future.sequence() which returns Future[Collection] from Collection[Future]

Upvotes: 0

Related Questions