Steve Waldman
Steve Waldman

Reputation: 14083

Tail-recursive, functional polling in Scala

I have a function that looks for an Elephant over a network, returning a Future[Option[Elephant]]. The return value is a Future so that the function can return immediately while the network call happens asynchronously. It contains an Option for which None means not yet available, while Some means the elephant has been found:

def checkForElephant : Future[Option[Elephant]] = ???

What I'd like to do is write a function called pollForElephant.

def pollForElephant : Future[Elephant] = ???

This function should return a Future over a process that will call checkForElephant and succeed quickly if an element is found on the first check, but thereafter to check again every 10 seconds until an Elephant is found, even if there are no elephants and it has to try forever.

The easy way to do this is just to force the check to be synchronous, write a recursive function outside of the Future domain to poll, and then create a Future over the whole affair:

import scala.annotation.tailrec
import scala.concurrent.{Await,Future,blocking}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

class Elephant;

def checkForElephant : Future[Option[Elephant]] = ???

def synchronousCheckForElephant : Option[Elephant] = blocking {
  Await.result( checkForElephant, Duration.Inf )
}

@tailrec
def poll( last : Option[Elephant] ) : Elephant = {
  last match {
    case Some( elephant ) => elephant
    case None => {
      blocking {
        Thread.sleep( 10.seconds.toMillis )
      }
      poll( synchronousCheckForElephant )
    }
  }
}

def pollForElephant : Future[Elephant] = Future {
   poll( synchronousCheckForElephant )
}

This seems terribly inelegant, to start from the Future domain, force into synchrony, and then go back. I thought I should be able to do everything from the Future. So, I tried this:

import scala.annotation.tailrec
import scala.concurrent.{Await,Future,blocking}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

class Elephant;

def checkForElephant : Future[Option[Elephant]] = ???

// oops! this is not @tailrec
def poll( last : Future[Option[Elephant]] ) : Future[Elephant] = {
  last.flatMap { mbElephant =>
    mbElephant match {
      case Some( elephant ) => Future.successful( elephant )
      case None => {
        blocking {
          Thread.sleep( 10.seconds.toMillis )
        }
        poll( checkForElephant )
      }
    }
  }
}

def pollForElephant : Future[Elephant] = poll( checkForElephant )

Unfortunately, as the comment above says, the poll(...) function is not tail recursive. Elephants might take a long time to arrive, and I'm supposed to wait indefinitely, but the stack might blow.

And the whole thing feels a little weird. Should I just revert to the easier-to-reason-about synchronous approach? Is there a safe way to do what I mean to while staying in the Future?

Upvotes: 1

Views: 1464

Answers (1)

SergGr
SergGr

Reputation: 23788

I agree with @PH88 comment: you don't need call to be tail recursive since in checkForElephant inside your flatMap you create a new Future and thus new stack. Here is a simple code I tried to mock your checkForElephant:

type Elephant = String
val rnd = new Random()

def checkForElephant: Future[Option[Elephant]] = Future({
  val success = rnd.nextDouble() < 0.2
  println(s"Call to checkForElephant => $success")
  if (success) Some(Thread.currentThread().getStackTrace().mkString("\n")) else None
})

def poll(last: Future[Option[Elephant]]): Future[Elephant] = {
  last flatMap {
      case Some(elephant) => Future.successful(elephant)
      case None => {
        blocking {
          println("Sleeping")
          Thread.sleep(100.millisecond.toMillis)
        }
        poll(checkForElephant)
      }
    }
 }

def pollForElephant: Future[Elephant] = poll(checkForElephant)

val result = Await.result(pollForElephant, Duration.Inf)
println(result)

And here is output from one of runs:

Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => true
java.lang.Thread.getStackTrace(Thread.java:1556)
so.TestApp$$anonfun$so$TestApp$$checkForElephant$1$1.apply(TestApp.scala:97)
so.TestApp$$anonfun$so$TestApp$$checkForElephant$1$1.apply(TestApp.scala:94)
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

You may see that despite the fact that checkForElephant returned None first 7 times, stack trace is shallow.

Sidenote

What I don't like about your approach is the fact that you just block some thread when you do sleep for 10 seconds. This seems rather inefficient to me. If you want to have many such calls, you might consider using something smarter such as Java ScheduledThreadPoolExecutor or Akka Actors.

Update

But would it leak memory, the logical equivalent of stack frames, maintained as objects on the heap?

No, it should not unless you have something very strange in your checkForElephant. To have a memory leak some memory should be retained by some "root". Possible roots are: static variables, thread local variables and stack. As we know stack doesn't grow so it can't be a source of the leak. If you don't mess up with something static and/or thread local, you should be safe.

As to threads consumption, if you really have just one "elephant" in the system, I don't think there is anything significantly better. However if your checkForElephant is actually checkForElephant(id), then you might consume a lot of threads for no good reason. First step to improve this might be using Promise and ScheduledThreadPoolExecutor (I'm not aware of Scala equivalent for it) and sacrifice some functional style for better thread usage such as:

// Just 1 thread should be enough assuming checkForElephant schedules 
// it's Future on some executor rather than current thread
val scheduledExecutor = new ScheduledThreadPoolExecutor(1)

def pollForElephant: Future[Elephant] = {
  def scheduleDelayedPoll(p: Promise[Elephant]) = {
    scheduledExecutor.schedule(new Runnable {
      override def run() = poll(p)
    },
      10, TimeUnit.SECONDS)
  }

  def poll(p: Promise[Elephant]): Unit = {
    checkForElephant.onComplete {
      case s: Success[Option[Elephant]] => if (s.value.isDefined) p.success(s.value.get) else scheduleDelayedPoll(p)
      case f: Failure[_] => scheduleDelayedPoll(p)
    }
  }

  val p = Promise[Elephant]()
  poll(p)
  p.future
}

If you have more load, next step would be using some non-blocking I/O for your checkForElephant to not block threads for requests over network. If you actually use a Web-service, take a look at Play WS API which is a Scala-wrapper around AsyncHttpClient which is in turn based on Netty

Upvotes: 1

Related Questions