Reputation: 332
Suppose, we have an actor, which divides tasks into subtasks and delegates them to child actors. This actor does some own work, after it receives responses from all of subactors.
What's the best practice to manage with responses of subactors?
Put their results into, for example, Map<ActorRef, Object>
and check every time if all replies received?
Upvotes: 0
Views: 461
Reputation: 5
Helo, usually I implement it using List, or if You want Map, of objects type Either: Left and Right (to save information about each actor result - success or fail) and check length of map every time new message from child is received, and when all messages received, send to self message WorkDone or sth with this meaning.
Somewhere in Net I've seen pattern with separate actor created for collecting all messages form childs
superactor:
val n = 1000
val collector = system.actorOf(Props(classOf[Collector],n)
for( i<-1 to n){
system.actorOf(Child.props()).!(DoSomeWork)(collector)
}
collector
class Collector(expectedMsgs: Int) extends Actor {
var results = someCollection
def endCollecting() = {
// You can add here some logic before returning data
context.parent ! results.toList //or whatever you want
context.become(afterWork)
}
def receive = {
case r: DataDone =>
results += Right(r)
if(results.length>=expectedMsgs){
endCollecting()
}
case r: DataFail =>
results += Left(r)
if(results.length>=expectedMsgs){
endCollecting()
}
}
def afterWork = {
case _ =>
// for catch some late or mutiple messages
}
}
Also look at http://doc.akka.io/docs/akka/snapshot/contrib/aggregator.html
Upvotes: 0
Reputation: 22374
You can use Composing Futures. It's better to use it in combination with Routing for children. Something like:
import scala.concurrent._
import akka.pattern._
import akka.routing._
import akka.actor._
import akka.util._
import scala.concurrent.duration._
case class Req(i: Int)
case class Response(i: Int)
class Worker extends Actor { def receive = {case Req(i) => sender ! Response(i) }}
class Parent extends Actor {
import context.dispatcher
implicit val timeout = Timeout(10, MINUTES) //timeout for response from worker
val router: ActorRef =
context.actorOf(RoundRobinPool(5).props(Props[Worker]), "router")
def processResults(rs: Seq[Response]) = {println(rs); rs}
def receive = {
case t: List[Req] =>
Future.sequence(t.map(router ? _).map(_.mapTo[Response])).map(processResults)
}
}
Results:
scala> ActorSystem().actorOf(Props(classOf[Parent])) ! List(Req(1), Req(2), Req(3))
List(Response(1), Response(2), Response(3))
Upvotes: 1