Reputation: 65
I have an actor (called Worker) which send a same message to 3 other Actors (called Filter1, Filter2, Filter3)
Each of this filters has a random time to resolve this action. Then, in the Worker actor, I use the ask pattern and wait the future success:
class Worker2 extends Actor with ActorLogging {
val filter1 = context.actorOf(Props[Filter1], "filter1")
val filter2 = context.actorOf(Props[Filter2], "filter2")
val filter3 = context.actorOf(Props[Filter3], "filter3")
implicit val timeout = Timeout(100.seconds)
def receive = {
case Work(t) =>
val futureF3 = (filter3 ? Work(false)).mapTo[Response]
val futureF2 = (filter2 ? Work(true)).mapTo[Response]
val futureF1 = (filter1 ? Work(true)).mapTo[Response]
val aggResult: Future[Boolean] =
for {
f3 <- futureF3
f2 <- futureF2
f1 <- futureF1
} yield f1.reponse && f2.reponse && f3.reponse
if (Await.result(aggResult, timeout.duration)) {
log.info("Response: true")
sender ! Response(true)
} else {
log.info("Response: false")
sender ! Response(false)
}
}
}
If any of the Filter actors return false, then I don't need the other answers. For example, If I run in parallel the 3 Filter Actors, if in one case, Filter1 response false, the Work is solved and I don't need the answers of Filter2 and Filter3.
In this code, I always need to wait for the 3 executions to decide, that seems unnecessary. Is there a way to set up a short-circuit?
Upvotes: 3
Views: 496
Reputation: 1441
A solution to this problem is to use Future.find() -- Scaladoc Here
You could solve it like this:
val failed = Future.find([f1,f2,f3]) { res => !res }
Await.result(failed, timeout.duration) match {
None => // Success
_ => // Failed
}
Future.find() will return the first future that completes and matches the predicate. If all futures have completed and none of the results match the predicate then it returns a None.
Edit:
A better solution would be to prevent blocking all together and use the akka pipe functionality to pipe the result directly to the sender when a response is found. This way your not blocking a thread using this actor:
import akka.pattern.pipe
val failed = Future.find([f1,f2,f3]) { res => !res }
val senderRef = sender
failed.map(res => Response(res.getOrElse(true))).pipeTo(senderRef)
In the getOrElse(true) part the result is false if we found a future just like before otherwise we return true.
Upvotes: 7
Reputation: 1892
I think what you want is to filter the future if the response is true. Because of the way the for expression works it will short circuit and won't bother waiting for the rest of the futures to complete in order to assemble the the response. It will still return a failed future with a MatchError exception (per [1]) which you need to handle using an onFailure handler
so
val aggResult = for {
f3 <- futureF3 if (f3.response)
f2 <- futureF2 if (f2.response)
f1 <- futureF1 if (f1.response)
} yield f1.reponse && f2.reponse && f3.reponse
aggResult.onFailure { case MatchError => sender ! false }
[1] : https://groups.google.com/forum/#!msg/akka-user/oCBpAMRekks/X4y0QV-oOPYJ
Upvotes: 1