gilad hoch
gilad hoch

Reputation: 2866

waiting for "recursive" futures in scala

a simple code sample that describes my problem:

import scala.util._
import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global

class LoserException(msg: String, dice: Int) extends Exception(msg) { def diceRoll: Int = dice }

def aPlayThatMayFail: Future[Int] = {
    Thread.sleep(1000) //throwing a dice takes some time...
    //throw a dice:
    (1 + Random.nextInt(6)) match {
        case 6 => Future.successful(6) //I win!
        case i: Int => Future.failed(new LoserException("I did not get 6...", i))
    }
}

def win(prefix: String): String = {
    val futureGameLog = aPlayThatMayFail
    futureGameLog.onComplete(t => t match {
        case Success(diceRoll) => "%s, and finally, I won! I rolled %d !!!".format(prefix, diceRoll)
        case Failure(e) => e match {
            case ex: LoserException => win("%s, and then i got %d".format(prefix, ex.diceRoll))
            case _: Throwable => "%s, and then somebody cheated!!!".format(prefix)
        }
    })
"I want to do something like futureGameLog.waitForRecursiveResult, using Await.result or something like that..."
}

win("I started playing the dice")   

this simple example illustrates what i want to do. basically, if to put it in words, i want to wait for a result for some computation, when i compose different actions on previous success or failed attampts.

so how would you implement the win method?

my "real world" problem, if it makes any difference, is using dispatch for asynchronous http calls, where i want to keep making http calls whenever the previous one ends, but actions differ on wether the previous http call succeeded or not.

Upvotes: 4

Views: 2506

Answers (3)

kuabhina1702
kuabhina1702

Reputation: 79

This works for me:

def retryWithFuture[T](f: => Future[T],retries:Int, delay:FiniteDuration)    (implicit ec: ExecutionContext, s: Scheduler): Future[T] ={
    f.recoverWith { case _ if retries > 0 =>  after[T](delay,s)(retryWithFuture[T]( f , retries - 1 , delay)) }
}

Upvotes: 1

cmbaxter
cmbaxter

Reputation: 35443

As drexin answered the part about exception handling and recovering, let me try and answer the part about a recursive function involving futures. I believe using a Promise will help you achieve your goal. The restructured code would look like this:

def win(prefix: String): String = {
    val prom = Promise[String]()

    def doWin(p:String) {
      val futureGameLog = aPlayThatMayFail
      futureGameLog.onComplete(t => t match {
          case Success(diceRoll) => prom.success("%s, and finally, I won! I rolled %d !!!".format(prefix, diceRoll))
          case Failure(e) => e match {
              case ex: LoserException => doWin("%s, and then i got %d".format(prefix, ex.diceRoll))
              case other => prom.failure(new Exception("%s, and then somebody cheated!!!".format(prefix)))
          }
      })        
    }
    doWin(prefix)
    Await.result(prom.future, someTimeout)
}

Now this won't be true recursion in the sense that it will be building up one long stack due to the fact that the futures are async, but it is similar to recursion in spirit. Using the promise here gives you something to block against while the recursion does it's thing, blocking the caller from what's happening behind the scene.

Now, if I was doing this, I would probable redefine things like so:

def win(prefix: String): Future[String] = {
    val prom = Promise[String]()

    def doWin(p:String) {
      val futureGameLog = aPlayThatMayFail
      futureGameLog.onComplete(t => t match {
          case Success(diceRoll) => prom.success("%s, and finally, I won! I rolled %d !!!".format(prefix, diceRoll))
          case Failure(e) => e match {
              case ex: LoserException => doWin("%s, and then i got %d".format(prefix, ex.diceRoll))
              case other => prom.failure(new Exception("%s, and then somebody cheated!!!".format(prefix)))
          }
      })        
    }
    doWin(prefix)
    prom.future
}   

This way you can defer the decision on whether to block or use async callbacks to the caller of this function. This is more flexible, but it also exposes the caller to the fact that you are doing async computations and I'm not sure that is going to be acceptable for your scenario. I'll leave that decision up to you.

Upvotes: 2

drexin
drexin

Reputation: 24413

You can recover your failed future with a recursive call:

def foo(x: Int) = x match {
  case 10 => Future.successful(x)
  case _ => Future.failed[Int](new Exception)
}

def bar(x: Int): Future[Int] = {
  foo(x) recoverWith { case _ => bar(x+1) }
}

scala> bar(0)
res0: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@64d6601

scala> res0.value
res1: Option[scala.util.Try[Int]] = Some(Success(10))

recoverWith takes a PartialFunction[Throwable,scala.concurrent.Future[A]] and returns a Future[A]. You should be careful though, because it will use quite some memory when it does lots of recursive calls here.

Upvotes: 6

Related Questions