nairbv
nairbv

Reputation: 4323

idiomatic timeouts for long processes in multi-threaded scala

So, I see a few questions on stackoverflow asking in one way or another how to "kill" a future, a la the deprecated Thread.stop(). I see answers explaining why it's impossible, but not an alternative mechanism to solve similar problems.

For example: Practical use of futures? Ie, how to kill them?

I realize a future can't be "killed."

I know how I could do this the Java way: break up the task into smaller sleeps, and have some "volatile boolean isStillRunning" in a thread class which is periodically checked. If I've cancelled the thread by updating this value, the thread exits. This involves "shared state" (the isStillRunning var), and if I were to do the same thing in Scala it wouldn't seem very "functional."

What's the correct way to do solve this sort of problem in idiomatic functional scala? Is there a reasonably concise way to do it? Should I revert to "normal" threading and volatile flags? I should use @volatile in the same way as the Java keyword?

Upvotes: 4

Views: 291

Answers (2)

nairbv
nairbv

Reputation: 4323

I think I've found a better solution to my own problem. Instead of using a volatile variable to let an operation know when to die, I can send a higher-priority exit message to the actor. It looks something like this:

val a = new Actor() { 
  def act():Unit = {
    loop{ react {
      case "Exit" => exit(); return;
      case MyMessage => {
        //this check makes "Exit" a high priority message, if "Exit" 
        //is on the queue, it will be handled before proceeding to 
        //handle the real message.
        receiveWithin(0) {
          case "Exit" => exit(); return
          case TIMEOUT => //don't do anything.
        }
        sender ! "hi!" //reply to sender
      }
    }}
  }
}

a.start()
val f = a !! MyMessage
while( ! f.isSet && a.getState != Actor.State.Terminated ) {
  //will probably return almost immediately unless the Actor was terminated
  //after I checked.
  Futures.awaitAll(100,f)
}
if( a.getState != Actor.State.Terminated ) {
  f() // the future should evaluate to "hi!"
}
a ! "Exit" //stops the actor from processing anymore messages.
           //regardless of if any are still on the queue.
a.getState // terminated

There's probably a cleaner way to write this.. but that's approximately what I did in my application.

The reactWithin(0) is an immediate no-op unless there is an "Exit" message on the queue. The queue'd "Exit" message replaces the volatile boolean I would have put in a threaded Java application.

Upvotes: 1

som-snytt
som-snytt

Reputation: 39577

Yes, it looks the same as in Java.

For a test rig, where a test can hang or run too long, I use a promise for failing the test (for any reason). For instance, a timeout monitor can "cancel" the test runner (interrupt the thread and compareAndSet a flag) and then complete the promise with failure. Or test prep can fail a badly configured test early. Or, the test runs and produces a result. At a higher level, the test rig just sees the future and its value.

What is different from Java are your options for composing futures.

val all = Future.traverse(tests)(test => {
  val toKeep = promise[Result]    // promise to keep, or fail by monitor
  val f = for (w <- feed(prepare(test, toKeep))) yield {
    monitored(w, toKeep) {
      listener.start(w)
      w.runTest()
    }
  }
  f completing consume _
  // strip context
  val g = f map (r => r.copy(context = null))
  (toKeep completeWith g).future
})

Upvotes: 1

Related Questions