Reputation: 42050
Suppose I have a function, which invokes a blocking interruptible operation. I would like to run it asynchronously with a timeout. That is, I would like to interrupt the function when the timeout is expired.
So I am trying to do something like that:
import scala.util.Try import scala.concurrent.Future def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = { val aref = new java.util.concurrent.atomic.AtomicReference[Thread]() import ExecutionContext.Implicits.global Future {Thread.sleep(timeout); aref.get().interrupt} // 1 Future {aref.set(Thread.currentThread); Try(f())} // 2 }
The problem is that aref
in (1) can be null because (2) has not set it to the current thread yet. In this case I would like to wait until aref
is set. What is the best way to do that ?
Upvotes: 24
Views: 24729
Reputation: 28511
You can go for a slightly easier approach using Await. The Await.result
method takes timeout duration as a second parameter and throws a TimeoutException
on timeout.
try {
import scala.concurrent.duration._
Await.result(aref, 10 seconds);
} catch {
case e: TimeoutException => // whatever you want to do.
}
Upvotes: 17
Reputation: 4095
I needed the same behavior as well, so this is how I solved it. I basically created an object that creates a timer and fails the promise with a TimeoutException if the future hasn't completed in the specified duration.
package mypackage
import scala.concurrent.{Promise, Future}
import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorSystem
import scala.concurrent.ExecutionContext.Implicits.global
object TimeoutFuture {
val actorSystem = ActorSystem("myActorSystem")
def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {
val promise = Promise[A]()
actorSystem.scheduler.scheduleOnce(timeout) {
promise tryFailure new java.util.concurrent.TimeoutException
}
Future {
try {
promise success block
}
catch {
case e:Throwable => promise failure e
}
}
promise.future
}
}
Upvotes: 6
Reputation: 24413
Although you already got some answers on how to achieve it with blocking the additional thread to handle the timeout, I would suggest you to try a different way, for the reason Rex Kerr already gave. I don't exactly know, what you are doing in f()
, but if it is I/O bound, I would suggest you to just use an asynchronous I/O library instead. If it is some kind of loop, you could pass the timeout value directly into that function and throw a TimeoutException
there, if it exceeds the timeout. Example:
import scala.concurrent.duration._
import java.util.concurrent.TimeoutException
def doSth(timeout: Deadline) = {
for {
i <- 0 to 10
} yield {
Thread.sleep(1000)
if (timeout.isOverdue)
throw new TimeoutException("Operation timed out.")
i
}
}
scala> future { doSth(12.seconds.fromNow) }
res3: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] =
scala.concurrent.impl.Promise$DefaultPromise@3d104456
scala> Await.result(res3, Duration.Inf)
res6: scala.collection.immutable.IndexedSeq[Int] =
Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> future { doSth(2.seconds.fromNow) }
res7: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] =
scala.concurrent.impl.Promise$DefaultPromise@f7dd680
scala> Await.result(res7, Duration.Inf)
java.util.concurrent.TimeoutException: Operation timed out.
at $anonfun$doSth$1.apply$mcII$sp(<console>:17)
at $anonfun$doSth$1.apply(<console>:13)
at $anonfun$doSth$1.apply(<console>:13)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
...
scala> res7.value
res10: Option[scala.util.Try[scala.collection.immutable.IndexedSeq[Int]]] =
Some(Failure(java.util.concurrent.TimeoutException: Operation timed out.))
This will only use only 1 thread, that will be terminated after timeout + execution time of a single step.
Upvotes: 3
Reputation: 35443
You could also try using a CountDownLatch
like this:
def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {
val aref = new java.util.concurrent.atomic.AtomicReference[Thread]()
import ExecutionContext.Implicits.global
val latch = new CountDownLatch(1)
Future {
latch.await()
aref.get().interrupt
}
Future {
aref.set(Thread.currentThread)
latch.countDown()
Try(f())
}
}
Now I'm waiting forever with my call to latch.await()
, but you could certainly change that to:
latch.await(1, TimeUnit.SECONDS)
and then wrap it with a Try
to handle if when/if it times out.
Upvotes: 1
Reputation: 167871
If you add a CountDownLatch
you can achieve the behavior you want. (Note that blocking (i.e. getting stuck at await
) in lots and lots of Future
s may lead to starvation of thread pools.)
import scala.util.Try
import scala.concurrent.Future
def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {
val aref = new java.util.concurrent.atomic.AtomicReference[Thread]()
val cdl = new java.util.concurrent.CountDownLatch(1)
import ExecutionContext.Implicits.global
Future {Thread.sleep(timeout); cdl.await(); aref.get().interrupt} // 1
Future {aref.set(Thread.currentThread); cdl.countDown(); Try(f())} // 2
}
Upvotes: 5