German
German

Reputation: 65

Akka and concurrent Actor execution

I have an actor (called Worker) which send a same message to 3 other Actors (called Filter1, Filter2, Filter3)

Each of this filters has a random time to resolve this action. Then, in the Worker actor, I use the ask pattern and wait the future success:

class Worker2 extends Actor with ActorLogging {

  val filter1 = context.actorOf(Props[Filter1], "filter1")
  val filter2 = context.actorOf(Props[Filter2], "filter2")
  val filter3 = context.actorOf(Props[Filter3], "filter3")

  implicit val timeout = Timeout(100.seconds)

  def receive = {
    case Work(t) =>

      val futureF3 = (filter3 ? Work(false)).mapTo[Response]
      val futureF2 = (filter2 ? Work(true)).mapTo[Response]
      val futureF1 = (filter1 ? Work(true)).mapTo[Response]

      val aggResult: Future[Boolean] =
        for {
          f3 <- futureF3
          f2 <- futureF2
          f1 <- futureF1
        } yield f1.reponse && f2.reponse && f3.reponse

      if (Await.result(aggResult, timeout.duration)) {
        log.info("Response: true")
        sender ! Response(true)
      } else {
        log.info("Response: false")
        sender ! Response(false)
      }
  }
}

If any of the Filter actors return false, then I don't need the other answers. For example, If I run in parallel the 3 Filter Actors, if in one case, Filter1 response false, the Work is solved and I don't need the answers of Filter2 and Filter3.

In this code, I always need to wait for the 3 executions to decide, that seems unnecessary. Is there a way to set up a short-circuit?

Upvotes: 3

Views: 496

Answers (2)

Bryan
Bryan

Reputation: 1441

A solution to this problem is to use Future.find() -- Scaladoc Here

You could solve it like this:

val failed = Future.find([f1,f2,f3]) { res => !res }
Await.result(failed, timeout.duration) match {
    None => // Success
    _ => // Failed
}

Future.find() will return the first future that completes and matches the predicate. If all futures have completed and none of the results match the predicate then it returns a None.

Edit:

A better solution would be to prevent blocking all together and use the akka pipe functionality to pipe the result directly to the sender when a response is found. This way your not blocking a thread using this actor:

import akka.pattern.pipe

val failed = Future.find([f1,f2,f3]) { res => !res }
val senderRef = sender
failed.map(res => Response(res.getOrElse(true))).pipeTo(senderRef)

In the getOrElse(true) part the result is false if we found a future just like before otherwise we return true.

Upvotes: 7

Ratan Sebastian
Ratan Sebastian

Reputation: 1892

I think what you want is to filter the future if the response is true. Because of the way the for expression works it will short circuit and won't bother waiting for the rest of the futures to complete in order to assemble the the response. It will still return a failed future with a MatchError exception (per [1]) which you need to handle using an onFailure handler

so

val aggResult = for {
  f3 <- futureF3 if (f3.response)
  f2 <- futureF2 if (f2.response)
  f1 <- futureF1 if (f1.response)
} yield f1.reponse && f2.reponse && f3.reponse

aggResult.onFailure { case MatchError => sender ! false } 

[1] : https://groups.google.com/forum/#!msg/akka-user/oCBpAMRekks/X4y0QV-oOPYJ

Upvotes: 1

Related Questions