Greg
Greg

Reputation: 11542

What's a good way in Akka to wait for a group of Actors to respond?

In Akka, I want to send out a "status" message to actors in a cluster for their status. These actors may be various states of health, including dead/unable to respond.

I want to wait up to some time, say 10 seconds, then proceed with whatever results I happened to receive back in that time limit. I don't want to fail the whole thing because 1 or 2 were having issues and didn't responded/timed-out at 10 seconds.

I've tried this:

object GetStats {
  def unapply(n: ActorRef )(implicit system: ActorSystem): Option[Future[Any]] = Try {
    implicit val t: Timeout = Timeout(10 seconds)
    n ? "A" 
  }.toOption
}
...
val z = List(a,b,c,d)  // where a-d are ActorRefs to nodes I want to status
val q = z.collect {
   case GetStats(s) => s
}
// OK, so here 'q' is a List[Future[Any]]
val allInverted = Future.sequence(q) // now we have Future[List[Any]]
val ok =  Await.result(allInverted, 10 seconds).asInstanceOf[List[String]]
println(ok)

The problem with this code is that it seems to throw a TimeoutException if 1 or more don't respond. Then I can't get to the responses that did come back.

Upvotes: 4

Views: 1063

Answers (1)

dk14
dk14

Reputation: 22374

Assuming, you really need to collect at least partial statistics every 10 seconds - the solution is to convert "not responding" to actual failure.

To achieve this, just increase the Await timeout a bit in comparision with implicit val t:Timeout for ask. After that your futures itselves (returned from ?) will fail earlier. So you can recover them:

// Works only when AskTimeout >> AwaitTimeout
val qfiltered = q.map(_.map(Some(_)).recover{case _ => None}) //it's better to match TimeoutException here instead of `_`
val allInverted = Future.sequence(q).map(_.flatten)

Example:

scala> class MyActor extends Actor{ def receive = {case 1 => sender ! 2; case _ =>}}
defined class MyActor

scala> val a = sys.actorOf(Props[MyActor])
a: akka.actor.ActorRef = Actor[akka://1/user/$c#1361310022]

scala> implicit val t: Timeout = Timeout(1 seconds)
t: akka.util.Timeout = Timeout(1 second)

scala> val l = List(a ? 1, a ? 100500).map(_.map(Some(_)).recover{case _ => None})
l: List[scala.concurrent.Future[Option[Any]]] = List(scala.concurrent.impl.Promise$DefaultPromise@7faaa183, scala.concurrent.impl.Promise$DefaultPromise@1b51e0f0)

scala> Await.result(Future.sequence(l).map(_.flatten), 3 seconds)
warning: there were 1 feature warning(s); re-run with -feature for details
res29: List[Any] = List(2)

If you want to know which Future didn't respond - remove flatten.

Receiving partial response should be enough for continously collecting statistics, as if some worker actor didn't respond in time - it will respond next time with actual data and without any data lost. But you should correcly process worker's lifecycle and not loose (if it matters) any data inside actor itself.

If the reason of timeouts is just high pressure on system - you may consider:

  • separate pool for workers
  • backpressure
  • caching for input requests (when system overloaded).

If the reason of such timeouts is some remote storage - then partial response is correct way to process it if client is ready for that. WebUI for example may warn a user that shown data may not be full using some spinning thing. But don't forget to not block actors with storage requests (futures may help) or at least move them to the separrate thread-pool.

If worker actor didn't respond because of failure (like exception) - you can still send notification to sender from your preRestart - so you can also receive the reason why there is no statistics from worker. The only thing here - you shoud check if sender is available (it may not be)

P.S. I hope you don't do Await.result inside some actor - blocking an actor is not recommended at least for your application performance. In some cases it may cause even deadlocks or memory leaks. So await's should be placed somewhere in facade of your system (if underlying framework doesn't support futures).

So it may have a sense to process your answers asynchronously (you will still need to recover them from failure if some actor doesn't respond):

 //actor:
 val parent = sender
 for(list <- Future.sequence(qfiltered)) {
     parent ! process(list)
 }

 //in sender (outside of the actors):
 Await(actor ? Get, 10 seconds)

Upvotes: 4

Related Questions