Michael
Michael

Reputation: 42050

Future with Timeout in Scala

Suppose I have a function, which invokes a blocking interruptible operation. I would like to run it asynchronously with a timeout. That is, I would like to interrupt the function when the timeout is expired.

So I am trying to do something like that:

import scala.util.Try
import scala.concurrent.Future

def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {

  val aref = new java.util.concurrent.atomic.AtomicReference[Thread]() 

  import ExecutionContext.Implicits.global
  Future {Thread.sleep(timeout); aref.get().interrupt} // 1
  Future {aref.set(Thread.currentThread); Try(f())}    // 2
}

The problem is that aref in (1) can be null because (2) has not set it to the current thread yet. In this case I would like to wait until aref is set. What is the best way to do that ?

Upvotes: 24

Views: 24729

Answers (5)

flavian
flavian

Reputation: 28511

You can go for a slightly easier approach using Await. The Await.result method takes timeout duration as a second parameter and throws a TimeoutException on timeout.

try {
  import scala.concurrent.duration._
  Await.result(aref, 10 seconds);
} catch {
    case e: TimeoutException => // whatever you want to do.
}

Upvotes: 17

anshumans
anshumans

Reputation: 4095

I needed the same behavior as well, so this is how I solved it. I basically created an object that creates a timer and fails the promise with a TimeoutException if the future hasn't completed in the specified duration.

package mypackage

import scala.concurrent.{Promise, Future}
import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorSystem
import scala.concurrent.ExecutionContext.Implicits.global

object TimeoutFuture {

  val actorSystem = ActorSystem("myActorSystem")
  def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {
    val promise = Promise[A]()
    actorSystem.scheduler.scheduleOnce(timeout) {
      promise tryFailure new java.util.concurrent.TimeoutException
    }

    Future {
      try {
        promise success block
      }
      catch {
        case e:Throwable => promise failure e
      } 
    }

    promise.future
  }
}

Upvotes: 6

drexin
drexin

Reputation: 24413

Although you already got some answers on how to achieve it with blocking the additional thread to handle the timeout, I would suggest you to try a different way, for the reason Rex Kerr already gave. I don't exactly know, what you are doing in f(), but if it is I/O bound, I would suggest you to just use an asynchronous I/O library instead. If it is some kind of loop, you could pass the timeout value directly into that function and throw a TimeoutException there, if it exceeds the timeout. Example:

import scala.concurrent.duration._
import java.util.concurrent.TimeoutException

def doSth(timeout: Deadline) = {
  for {
    i <- 0 to 10
  } yield {
    Thread.sleep(1000)
    if (timeout.isOverdue)
      throw new TimeoutException("Operation timed out.")

    i
  }
}

scala> future { doSth(12.seconds.fromNow) }
res3: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] = 
  scala.concurrent.impl.Promise$DefaultPromise@3d104456

scala> Await.result(res3, Duration.Inf)
res6: scala.collection.immutable.IndexedSeq[Int] =
  Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> future { doSth(2.seconds.fromNow) }
res7: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] = 
  scala.concurrent.impl.Promise$DefaultPromise@f7dd680

scala> Await.result(res7, Duration.Inf)
java.util.concurrent.TimeoutException: Operation timed out.
    at $anonfun$doSth$1.apply$mcII$sp(<console>:17)
    at $anonfun$doSth$1.apply(<console>:13)
    at $anonfun$doSth$1.apply(<console>:13)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    ...

scala> res7.value
res10: Option[scala.util.Try[scala.collection.immutable.IndexedSeq[Int]]] =
  Some(Failure(java.util.concurrent.TimeoutException: Operation timed out.))

This will only use only 1 thread, that will be terminated after timeout + execution time of a single step.

Upvotes: 3

cmbaxter
cmbaxter

Reputation: 35443

You could also try using a CountDownLatch like this:

def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {

  val aref = new java.util.concurrent.atomic.AtomicReference[Thread]() 

  import ExecutionContext.Implicits.global
  val latch = new CountDownLatch(1)
  Future {
    latch.await()
    aref.get().interrupt
  }

  Future {
    aref.set(Thread.currentThread) 
    latch.countDown()
    Try(f())
  }
}

Now I'm waiting forever with my call to latch.await(), but you could certainly change that to:

latch.await(1, TimeUnit.SECONDS)

and then wrap it with a Try to handle if when/if it times out.

Upvotes: 1

Rex Kerr
Rex Kerr

Reputation: 167871

If you add a CountDownLatch you can achieve the behavior you want. (Note that blocking (i.e. getting stuck at await) in lots and lots of Futures may lead to starvation of thread pools.)

import scala.util.Try
import scala.concurrent.Future

def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {

  val aref = new java.util.concurrent.atomic.AtomicReference[Thread]()
  val cdl = new java.util.concurrent.CountDownLatch(1)

  import ExecutionContext.Implicits.global
  Future {Thread.sleep(timeout); cdl.await(); aref.get().interrupt}   // 1
  Future {aref.set(Thread.currentThread); cdl.countDown(); Try(f())}  // 2
}

Upvotes: 5

Related Questions