Reputation: 6422
I have an actor the receives a message and runs two futures. Those futures can run in parallel, so I thought I could use a for comprehension to run both futures and combine their result into one response to the sender. I can get the results from each by themselves, but I don't know what to do to get aggregate them when they are both complete.
def receive = {
case "pcbStatus" => {
val currentSender = sender
//first future
val wsf = (self ? "workhorseStats")(5 seconds)
val psf = Future.traverse(context.children)(x => {
(x ? "reportStatus")(5 seconds)
});
val combined = for {
r1 <- wsf
r2 <- psf
} yield (r1, r2)
combined.onComplete {
case Success(result:Any) => {
val response = new SomeCaseClass(r1,r2)
println("YAY: " + response)
currentSender ! response
}
case Failure(failure) => {
println("FAIL: " + failure)
}
}
}
}
Upvotes: 2
Views: 1291
Reputation: 35443
I've coded a little example of what I think it is you are trying to do. First, the two actor classes:
class ParentActor extends Actor{
import context._
import akka.pattern.pipe
implicit val timeout = Timeout(5 seconds)
override def preStart = {
context.actorOf(Props[ChildActor], "child-a")
context.actorOf(Props[ChildActor], "child-b")
}
def receive = {
case "foo" =>
val fut1 = (self ? "bar").mapTo[Int]
val fut2 = Future.traverse(context.children)(child => (child ? "baz").mapTo[Int])
val aggFut = for{
f1 <- fut1
f2 <- fut2
} yield SomeResult(f1, f2.toList)
aggFut pipeTo sender
case "bar" =>
sender ! 2
}
}
class ChildActor extends Actor{
def receive = {
case "baz" =>
sender ! 1
}
}
Then you could test it with this code:
implicit val timeout = Timeout(5 seconds)
val system = ActorSystem("foo")
val actor = system.actorOf(Props[ParentActor])
val result = actor ? "foo"
import system._
result onComplete{
case tr => println(tr)
}
When you run this, it should print Success(SomeResult(2,List(1, 1)))
.
A couple of things here. First, using mapTo
allows the types to be known as opposed to having to deal with Any
. Second, pipeTo
is a good option here to avoid closing over the sender and it also simplifies the code a bit.
Upvotes: 4
Reputation: 5326
There is a trivial way to combine Futures. For example (without akka):
import scala.concurrent.ExecutionContext.Implicits.global
val promiseInt = Promise[Int]
val promiseString = Promise[String]
val futureInt = promiseInt.future
val futureString = promiseString.future
case class Special(i: Int, s: String)
futureInt.onSuccess { case(i) =>
futureString.onSuccess { case(s) =>
println(Special(i, s))
}
}
promiseInt.success(3)
promiseString.success("no")
Thread.sleep(100)
The order in which the two futures are completed is irrelevant. You can try inverting the two success triggers and you will get the same result.
I am using Promise
here only to build a running example; it has nothing to do with combining the Futures.
Upvotes: 3